From 5dc309b2bcd150b9db77c1103d2f2fbacbb1cbf3 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Tue, 29 Oct 2024 20:16:45 +0100 Subject: [PATCH] fix(bitswap/client/msgq): prevent duplicate requests Previously, in-progress requests could be re-requested again during periodic rebroadcast. The queue requests, and while awaiting response, the rebroadcast event happens. Rebroadcast event changes previosly sent WANTs to pending and sends them again in a new message. The solution here is to ensure WANT was in sent status for long enough, before bringing it back to pending. This utilizes existing `sendAt` map which tracks when every CID was sent. --- .../internal/messagequeue/messagequeue.go | 42 ++++++++++++++----- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/bitswap/client/internal/messagequeue/messagequeue.go b/bitswap/client/internal/messagequeue/messagequeue.go index edea20b9c..516c2cc0f 100644 --- a/bitswap/client/internal/messagequeue/messagequeue.go +++ b/bitswap/client/internal/messagequeue/messagequeue.go @@ -175,6 +175,26 @@ func (r *recallWantlist) ClearSentAt(c cid.Cid) { delete(r.sentAt, c) } +// Refresh moves wants from the sent list back to the pending list. +// If a want has been sent for longer than the interval, it is moved back to the pending list. +// Returns the number of wants that were refreshed. +func (r *recallWantlist) Refresh(now time.Time, interval time.Duration) int { + // Invalidate the cache up-front to avoid doing any work trying to keep it up-to-date. + r.pending.Absorb(nil) + + var refreshed int + for _, want := range r.sent.Entries() { + sentAt, ok := r.sentAt[want.Cid] + if ok && now.Sub(sentAt) >= interval { + r.pending.Add(want.Cid, want.Priority, want.WantType) + r.sent.Remove(want.Cid) + refreshed++ + } + } + + return refreshed +} + type peerConn struct { p peer.ID network MessageNetwork @@ -476,27 +496,27 @@ func (mq *MessageQueue) rebroadcastWantlist() { mq.rebroadcastIntervalLk.Unlock() // If some wants were transferred from the rebroadcast list - if mq.transferRebroadcastWants() { + if toRebroadcast := mq.transferRebroadcastWants(); toRebroadcast > 0 { // Send them out mq.sendMessage() + log.Infow("Rebroadcasting wants", "amount", toRebroadcast, "peer", mq.p) } } // Transfer wants from the rebroadcast lists into the pending lists. -func (mq *MessageQueue) transferRebroadcastWants() bool { +func (mq *MessageQueue) transferRebroadcastWants() int { mq.wllock.Lock() defer mq.wllock.Unlock() - // Check if there are any wants to rebroadcast - if mq.bcstWants.sent.Len() == 0 && mq.peerWants.sent.Len() == 0 { - return false - } - - // Copy sent wants into pending wants lists - mq.bcstWants.pending.Absorb(mq.bcstWants.sent) - mq.peerWants.pending.Absorb(mq.peerWants.sent) + mq.rebroadcastIntervalLk.Lock() + rebroadcastInterval := mq.rebroadcastInterval + mq.rebroadcastIntervalLk.Unlock() - return true + now := mq.clock.Now() + // Transfer sent wants into pending wants lists + transferred := mq.bcstWants.Refresh(now, rebroadcastInterval) + transferred += mq.peerWants.Refresh(now, rebroadcastInterval) + return transferred } func (mq *MessageQueue) signalWorkReady() {