From ceec5b747c618435274fa4c4a1e28f493cd4035d Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Mon, 25 Nov 2024 21:20:50 +0100 Subject: [PATCH] dspinner: RecursiveKeys(): do not hang on cancellations Per https://github.com/ipfs/kubo/issues/10593, if no one is reading from the channel returned by RecursiveKeys() and the context is cancelled, streamIndex will hang indefinitely. Proposed fix is to always select when attempting to write to the `out` channel. If the context is done and there is no one to read, we can abort. --- CHANGELOG.md | 3 ++- pinning/pinner/dspinner/pin.go | 19 +++++++++++++------ 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d32d0b6d..5081c49f1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -81,7 +81,8 @@ The following emojis are used to highlight certain changes: ### Fixed -- `unixfs/hamt` Log error instead of panic if both link and shard are nil [#393](https://github.com/ipfs/boxo/pull/393) +- `unixfs/hamt`: Log error instead of panic if both link and shard are nil [#393](https://github.com/ipfs/boxo/pull/393) +- `pinner/dspinner`: do not hang when listing keys and the `out` channel is no longer read [#727](https://github.com/ipfs/boxo/pull/727) ### Security diff --git a/pinning/pinner/dspinner/pin.go b/pinning/pinner/dspinner/pin.go index bc1f61902..ddc93c2c5 100644 --- a/pinning/pinner/dspinner/pin.go +++ b/pinning/pinner/dspinner/pin.go @@ -707,11 +707,19 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detaile defer p.lock.RUnlock() cidSet := cid.NewSet() + send := func(sp ipfspinner.StreamedPin) (ok bool) { + select { + case <-ctx.Done(): + return false + case out <- sp: + return true + } + } err := index.ForEach(ctx, "", func(key, value string) bool { c, err := cid.Cast([]byte(key)) if err != nil { - out <- ipfspinner.StreamedPin{Err: err} + send(ipfspinner.StreamedPin{Err: err}) return false } @@ -719,7 +727,7 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detaile if detailed { pp, err := p.loadPin(ctx, value) if err != nil { - out <- ipfspinner.StreamedPin{Err: err} + send(ipfspinner.StreamedPin{Err: err}) return false } @@ -731,17 +739,16 @@ func (p *pinner) streamIndex(ctx context.Context, index dsindex.Indexer, detaile } if !cidSet.Has(c) { - select { - case <-ctx.Done(): + if !send(ipfspinner.StreamedPin{Pin: pin}) { return false - case out <- ipfspinner.StreamedPin{Pin: pin}: } cidSet.Add(c) } return true }) if err != nil { - out <- ipfspinner.StreamedPin{Err: err} + send(ipfspinner.StreamedPin{Err: err}) + return } }()