Skip to content

Commit

Permalink
dspinner: RecursiveKeys(): do not hang on cancellations
Browse files Browse the repository at this point in the history
Per ipfs/kubo#10593, if no one
is reading from the channel returned by RecursiveKeys() and the context is cancelled, streamIndex will hang indefinitely.

Proposed fix is to always select when attempting to write to the `out` channel. If the context is done and there is no one to read, we can abort.
  • Loading branch information
hsanjuan committed Nov 25, 2024
1 parent c91cc1d commit ceec5b7
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 7 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ The following emojis are used to highlight certain changes:

### Fixed

- `unixfs/hamt` Log error instead of panic if both link and shard are nil [#393](https://github.com/ipfs/boxo/pull/393)
- `unixfs/hamt`: Log error instead of panic if both link and shard are nil [#393](https://github.com/ipfs/boxo/pull/393)
- `pinner/dspinner`: do not hang when listing keys and the `out` channel is no longer read [#727](https://github.com/ipfs/boxo/pull/727)

### Security

Expand Down
19 changes: 13 additions & 6 deletions pinning/pinner/dspinner/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,19 +707,27 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detaile
defer p.lock.RUnlock()

cidSet := cid.NewSet()
send := func(sp ipfspinner.StreamedPin) (ok bool) {
select {
case <-ctx.Done():
return false

Check warning on line 713 in pinning/pinner/dspinner/pin.go

View check run for this annotation

Codecov / codecov/patch

pinning/pinner/dspinner/pin.go#L712-L713

Added lines #L712 - L713 were not covered by tests
case out <- sp:
return true
}
}

err := index.ForEach(ctx, "", func(key, value string) bool {
c, err := cid.Cast([]byte(key))
if err != nil {
out <- ipfspinner.StreamedPin{Err: err}
send(ipfspinner.StreamedPin{Err: err})

Check warning on line 722 in pinning/pinner/dspinner/pin.go

View check run for this annotation

Codecov / codecov/patch

pinning/pinner/dspinner/pin.go#L722

Added line #L722 was not covered by tests
return false
}

var pin ipfspinner.Pinned
if detailed {
pp, err := p.loadPin(ctx, value)
if err != nil {
out <- ipfspinner.StreamedPin{Err: err}
send(ipfspinner.StreamedPin{Err: err})

Check warning on line 730 in pinning/pinner/dspinner/pin.go

View check run for this annotation

Codecov / codecov/patch

pinning/pinner/dspinner/pin.go#L730

Added line #L730 was not covered by tests
return false
}

Expand All @@ -731,17 +739,16 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detaile
}

if !cidSet.Has(c) {
select {
case <-ctx.Done():
if !send(ipfspinner.StreamedPin{Pin: pin}) {
return false
case out <- ipfspinner.StreamedPin{Pin: pin}:
}
cidSet.Add(c)
}
return true
})
if err != nil {
out <- ipfspinner.StreamedPin{Err: err}
send(ipfspinner.StreamedPin{Err: err})
return

Check warning on line 751 in pinning/pinner/dspinner/pin.go

View check run for this annotation

Codecov / codecov/patch

pinning/pinner/dspinner/pin.go#L750-L751

Added lines #L750 - L751 were not covered by tests
}
}()

Expand Down

0 comments on commit ceec5b7

Please sign in to comment.