Skip to content

Commit

Permalink
fix(bitswap/client/msgq): prevent duplicate requests
Browse files Browse the repository at this point in the history
Previously, in-progress requests could be re-requested again during periodic rebroadcast.
The queue requests, and while awaiting response, the rebroadcast event happens.
Rebroadcast event changes previosly sent WANTs to pending and sends them again in a new message.

The solution here is to ensure WANT was in sent status for long enough, before bringing it back to pending.
This utilizes existing `sendAt` map which tracks when every CID was sent.
  • Loading branch information
Wondertan committed Oct 29, 2024
1 parent 19bcc75 commit 5dc309b
Showing 1 changed file with 31 additions and 11 deletions.
42 changes: 31 additions & 11 deletions bitswap/client/internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,26 @@ func (r *recallWantlist) ClearSentAt(c cid.Cid) {
delete(r.sentAt, c)
}

// Refresh moves wants from the sent list back to the pending list.
// If a want has been sent for longer than the interval, it is moved back to the pending list.
// Returns the number of wants that were refreshed.
func (r *recallWantlist) Refresh(now time.Time, interval time.Duration) int {
// Invalidate the cache up-front to avoid doing any work trying to keep it up-to-date.
r.pending.Absorb(nil)

var refreshed int
for _, want := range r.sent.Entries() {
sentAt, ok := r.sentAt[want.Cid]
if ok && now.Sub(sentAt) >= interval {
r.pending.Add(want.Cid, want.Priority, want.WantType)
r.sent.Remove(want.Cid)
refreshed++
}
}

return refreshed
}

type peerConn struct {
p peer.ID
network MessageNetwork
Expand Down Expand Up @@ -476,27 +496,27 @@ func (mq *MessageQueue) rebroadcastWantlist() {
mq.rebroadcastIntervalLk.Unlock()

// If some wants were transferred from the rebroadcast list
if mq.transferRebroadcastWants() {
if toRebroadcast := mq.transferRebroadcastWants(); toRebroadcast > 0 {
// Send them out
mq.sendMessage()
log.Infow("Rebroadcasting wants", "amount", toRebroadcast, "peer", mq.p)
}
}

// Transfer wants from the rebroadcast lists into the pending lists.
func (mq *MessageQueue) transferRebroadcastWants() bool {
func (mq *MessageQueue) transferRebroadcastWants() int {
mq.wllock.Lock()
defer mq.wllock.Unlock()

// Check if there are any wants to rebroadcast
if mq.bcstWants.sent.Len() == 0 && mq.peerWants.sent.Len() == 0 {
return false
}

// Copy sent wants into pending wants lists
mq.bcstWants.pending.Absorb(mq.bcstWants.sent)
mq.peerWants.pending.Absorb(mq.peerWants.sent)
mq.rebroadcastIntervalLk.Lock()
rebroadcastInterval := mq.rebroadcastInterval
mq.rebroadcastIntervalLk.Unlock()

return true
now := mq.clock.Now()
// Transfer sent wants into pending wants lists
transferred := mq.bcstWants.Refresh(now, rebroadcastInterval)
transferred += mq.peerWants.Refresh(now, rebroadcastInterval)
return transferred
}

func (mq *MessageQueue) signalWorkReady() {
Expand Down

0 comments on commit 5dc309b

Please sign in to comment.