Skip to content

Commit

Permalink
Merge pull request #706 from ipfs/release-v0.24.1
Browse files Browse the repository at this point in the history
Release v0.24.1
  • Loading branch information
gammazero authored Oct 25, 2024
2 parents 2fdde02 + 88cdbca commit a9085af
Show file tree
Hide file tree
Showing 27 changed files with 244 additions and 169 deletions.
9 changes: 5 additions & 4 deletions .github/workflows/gateway-sharness.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ jobs:
shell: bash
steps:
- name: Setup Go
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version: 1.22.x
go-version: 1.23.x
- name: Checkout boxo
uses: actions/checkout@v3
with:
Expand All @@ -34,10 +34,11 @@ jobs:
run: |
go mod edit -replace=github.com/ipfs/boxo=../boxo
make mod_tidy
cat go.mod
working-directory: kubo
- name: Install sharness dependencies
run: make test_sharness_deps
run: |
find . -name go.mod -execdir go mod tidy \;
make test_sharness_deps
working-directory: kubo
- name: Run Kubo Sharness Tests
run: find . -maxdepth 1 -name "*gateway*.sh" -print0 | xargs -0 -I {} bash -c "echo {}; {}"
Expand Down
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,21 @@ The following emojis are used to highlight certain changes:

### Security

## [v0.24.1]

### Changed

- `routing/http/client`: creating delegated routing client with `New` now defaults to querying delegated routing server with `DefaultProtocolFilter` ([IPIP-484](https://github.com/ipfs/specs/pull/484)) [#689](https://github.com/ipfs/boxo/pull/689)
- updated go-libp2p to [v0.36.5](https://github.com/libp2p/go-libp2p/releases/tag/v0.36.5)
- updated dependencies [#693](https://github.com/ipfs/boxo/pull/693)
- update `go-libp2p-kad-dht` to [v0.27.0](https://github.com/libp2p/go-libp2p-kad-dht/releases/tag/v0.27.0)

### Fixed

- `routing/http/client`: optional address and protocol filter parameters from [IPIP-484](https://github.com/ipfs/specs/pull/484) use human-readable `,` instead of `%2C`. [#688](https://github.com/ipfs/boxo/pull/688)
- `bitswap/client` Cleanup live wants when wants are canceled. This prevents live wants from continuing to get rebroadcasted even after the wants are canceled. [#690](https://github.com/ipfs/boxo/pull/690)
- Fix problem adding invalid CID to exhausted wants list resulting in possible performance issue. [#692](https://github.com/ipfs/boxo/pull/692)

## [v0.24.0]

### Added
Expand Down
6 changes: 1 addition & 5 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
network: network,
process: px,
pm: pm,
pqm: pqm,
sm: sm,
sim: sim,
notif: notif,
Expand All @@ -184,7 +183,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
option(bs)
}

bs.pqm.Startup()
pqm.Startup()

// bind the context and process.
// do it over here to avoid closing before all setup is done.
Expand All @@ -203,9 +202,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
type Client struct {
pm *bspm.PeerManager

// the provider query manager manages requests to find providers
pqm *bspqm.ProviderQueryManager

// network delivers messages on behalf of the session
network bsnet.BitSwapNetwork

Expand Down
12 changes: 6 additions & 6 deletions bitswap/client/internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type MessageQueue struct {

// Dont touch any of these variables outside of run loop
sender bsnet.MessageSender
rebroadcastIntervalLk sync.RWMutex
rebroadcastIntervalLk sync.Mutex
rebroadcastInterval time.Duration
rebroadcastTimer *clock.Timer
// For performance reasons we just clear out the fields of the message
Expand Down Expand Up @@ -389,9 +389,9 @@ func (mq *MessageQueue) SetRebroadcastInterval(delay time.Duration) {

// Startup starts the processing of messages and rebroadcasting.
func (mq *MessageQueue) Startup() {
mq.rebroadcastIntervalLk.RLock()
mq.rebroadcastIntervalLk.Lock()
mq.rebroadcastTimer = mq.clock.Timer(mq.rebroadcastInterval)
mq.rebroadcastIntervalLk.RUnlock()
mq.rebroadcastIntervalLk.Unlock()
go mq.runQueue()
}

Expand Down Expand Up @@ -422,7 +422,7 @@ func (mq *MessageQueue) runQueue() {
}

var workScheduled time.Time
for mq.ctx.Err() == nil {
for {
select {
case <-mq.rebroadcastTimer.C:
mq.rebroadcastWantlist()
Expand Down Expand Up @@ -471,9 +471,9 @@ func (mq *MessageQueue) runQueue() {

// Periodically resend the list of wants to the peer
func (mq *MessageQueue) rebroadcastWantlist() {
mq.rebroadcastIntervalLk.RLock()
mq.rebroadcastIntervalLk.Lock()
mq.rebroadcastTimer.Reset(mq.rebroadcastInterval)
mq.rebroadcastIntervalLk.RUnlock()
mq.rebroadcastIntervalLk.Unlock()

// If some wants were transferred from the rebroadcast list
if mq.transferRebroadcastWants() {
Expand Down
3 changes: 2 additions & 1 deletion bitswap/client/internal/notifications/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,13 @@ func (ps *impl) Shutdown() {
// corresponding to |keys|.
func (ps *impl) Subscribe(ctx context.Context, keys ...cid.Cid) <-chan blocks.Block {
blocksCh := make(chan blocks.Block, len(keys))
valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking
if len(keys) == 0 {
close(blocksCh)
return blocksCh
}

valuesCh := make(chan interface{}, len(keys)) // provide our own channel to control buffer, prevent blocking

// prevent shutdown
ps.lk.RLock()
defer ps.lk.RUnlock()
Expand Down
2 changes: 1 addition & 1 deletion bitswap/client/internal/peermanager/peermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type PeerManager struct {
createPeerQueue PeerQueueFactory
ctx context.Context

psLk sync.RWMutex
psLk sync.Mutex
sessions map[uint64]Session
peerSessions map[peer.ID]map[uint64]struct{}

Expand Down
10 changes: 6 additions & 4 deletions bitswap/client/internal/peermanager/peerwantmanager.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package peermanager

import (
"bytes"
"fmt"
"strings"

cid "github.com/ipfs/go-cid"
peer "github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -158,8 +158,6 @@ func (pwm *peerWantManager) broadcastWantHaves(wantHaves []cid.Cid) {
// sendWants only sends the peer the want-blocks and want-haves that have not
// already been sent to it.
func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves []cid.Cid) {
fltWantBlks := make([]cid.Cid, 0, len(wantBlocks))
fltWantHvs := make([]cid.Cid, 0, len(wantHaves))

// Get the existing want-blocks and want-haves for the peer
pws, ok := pwm.peerWants[p]
Expand All @@ -169,6 +167,8 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves
return
}

fltWantBlks := make([]cid.Cid, 0, len(wantBlocks))

// Iterate over the requested want-blocks
for _, c := range wantBlocks {
// If the want-block hasn't been sent to the peer
Expand Down Expand Up @@ -198,6 +198,8 @@ func (pwm *peerWantManager) sendWants(p peer.ID, wantBlocks []cid.Cid, wantHaves
pwm.reverseIndexAdd(c, p)
}

fltWantHvs := make([]cid.Cid, 0, len(wantHaves))

// Iterate over the requested want-haves
for _, c := range wantHaves {
// If we've already broadcasted this want, don't bother with a
Expand Down Expand Up @@ -450,7 +452,7 @@ func (pwm *peerWantManager) getWants() []cid.Cid {
}

func (pwm *peerWantManager) String() string {
var b bytes.Buffer
var b strings.Builder
for p, ws := range pwm.peerWants {
b.WriteString(fmt.Sprintf("Peer %s: %d want-have / %d want-block:\n", p, ws.wantHaves.Len(), ws.wantBlocks.Len()))
for _, c := range ws.wantHaves.Keys() {
Expand Down
6 changes: 2 additions & 4 deletions bitswap/client/internal/session/peerresponsetracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ func (prt *peerResponseTracker) choose(peers []peer.ID) peer.ID {
return ""
}

rnd := rand.Float64()

// Find the total received blocks for all candidate peers
total := 0
for _, p := range peers {
Expand All @@ -41,6 +39,7 @@ func (prt *peerResponseTracker) choose(peers []peer.ID) peer.ID {

// Choose one of the peers with a chance proportional to the number
// of blocks received from that peer
rnd := rand.Float64()
counted := 0.0
for _, p := range peers {
counted += float64(prt.getPeerCount(p)) / float64(total)
Expand All @@ -52,8 +51,7 @@ func (prt *peerResponseTracker) choose(peers []peer.ID) peer.ID {
// We shouldn't get here unless there is some weirdness with floating point
// math that doesn't quite cover the whole range of peers in the for loop
// so just choose the last peer.
index := len(peers) - 1
return peers[index]
return peers[len(peers)-1]
}

// getPeerCount returns the number of times the peer was first to send us a
Expand Down
10 changes: 8 additions & 2 deletions bitswap/client/internal/session/sessionwants.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,12 @@ func (sw *sessionWants) GetNextWants() []cid.Cid {
// limit)
currentLiveCount := len(sw.liveWants)
toAdd := sw.broadcastLimit - currentLiveCount
liveSize := min(toAdd, sw.toFetch.Len())
if liveSize == 0 {
return nil
}

var live []cid.Cid
live := make([]cid.Cid, 0, liveSize)
for ; toAdd > 0 && sw.toFetch.Len() > 0; toAdd-- {
c := sw.toFetch.Pop()
live = append(live, c)
Expand Down Expand Up @@ -117,6 +121,7 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration)
cleaned = append(cleaned, c)
}
}
clear(sw.liveWantsOrder[len(cleaned):]) // GC cleared items
sw.liveWantsOrder = cleaned
}

Expand All @@ -127,7 +132,7 @@ func (sw *sessionWants) BlocksReceived(ks []cid.Cid) ([]cid.Cid, time.Duration)
// live want CIDs up to the broadcast limit.
func (sw *sessionWants) PrepareBroadcast() []cid.Cid {
now := time.Now()
live := make([]cid.Cid, 0, len(sw.liveWants))
live := make([]cid.Cid, 0, min(len(sw.liveWants), sw.broadcastLimit))
for _, c := range sw.liveWantsOrder {
if _, ok := sw.liveWants[c]; ok {
// No response was received for the want, so reset the sent time
Expand All @@ -148,6 +153,7 @@ func (sw *sessionWants) PrepareBroadcast() []cid.Cid {
func (sw *sessionWants) CancelPending(keys []cid.Cid) {
for _, k := range keys {
sw.toFetch.Remove(k)
delete(sw.liveWants, k)
}
}

Expand Down
8 changes: 3 additions & 5 deletions bitswap/client/internal/session/sessionwantsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ func (sws *sessionWantSender) Cancel(ks []cid.Cid) {
// Update is called when the session receives a message with incoming blocks
// or HAVE / DONT_HAVE
func (sws *sessionWantSender) Update(from peer.ID, ks []cid.Cid, haves []cid.Cid, dontHaves []cid.Cid) {
hasUpdate := len(ks) > 0 || len(haves) > 0 || len(dontHaves) > 0
if !hasUpdate {
if len(ks) == 0 && len(haves) == 0 && len(dontHaves) == 0 {
return
}

Expand Down Expand Up @@ -349,8 +348,7 @@ func (sws *sessionWantSender) trackWant(c cid.Cid) {
}

// Create the want info
wi := newWantInfo(sws.peerRspTrkr)
sws.wants[c] = wi
sws.wants[c] = newWantInfo(sws.peerRspTrkr)

// For each available peer, register any information we know about
// whether the peer has the block
Expand Down Expand Up @@ -481,7 +479,7 @@ func (sws *sessionWantSender) checkForExhaustedWants(dontHaves []cid.Cid, newlyU
// (because it may be the last peer who hadn't sent a DONT_HAVE for a CID)
if len(newlyUnavailable) > 0 {
// Collect all pending wants
wants = make([]cid.Cid, len(sws.wants))
wants = make([]cid.Cid, 0, len(sws.wants))
for c := range sws.wants {
wants = append(wants, c)
}
Expand Down
8 changes: 4 additions & 4 deletions bitswap/client/internal/sessionmanager/sessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type SessionManager struct {
notif notifications.PubSub

// Sessions
sessLk sync.RWMutex
sessLk sync.Mutex
sessions map[uint64]Session

// Session Index
Expand Down Expand Up @@ -159,13 +159,13 @@ func (sm *SessionManager) ReceiveFrom(ctx context.Context, p peer.ID, blks []cid

// Notify each session that is interested in the blocks / HAVEs / DONT_HAVEs
for _, id := range sm.sessionInterestManager.InterestedSessions(blks, haves, dontHaves) {
sm.sessLk.RLock()
sm.sessLk.Lock()
if sm.sessions == nil { // check if SessionManager was shutdown
sm.sessLk.RUnlock()
sm.sessLk.Unlock()
return
}
sess, ok := sm.sessions[id]
sm.sessLk.RUnlock()
sm.sessLk.Unlock()

if ok {
sess.ReceiveFrom(p, blks, haves, dontHaves)
Expand Down
5 changes: 4 additions & 1 deletion bitswap/network/ipfs_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,13 @@ func (s *streamMessageSender) multiAttempt(ctx context.Context, fn func() error)
return err
}

timer := time.NewTimer(s.opts.SendErrorBackoff)
defer timer.Stop()

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(s.opts.SendErrorBackoff):
case <-timer.C:
// wait a short time in case disconnect notifications are still propagating
log.Infof("send message to %s failed but context was not Done: %s", s.to, err)
}
Expand Down
6 changes: 5 additions & 1 deletion bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,15 +304,19 @@ func peersConnect(ctx context.Context, ph host.Host, availablePeers []peer.AddrI
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
timer := time.NewTimer(time.Second)
defer timer.Stop()

for {
select {
case <-ctx.Done():
return
case <-time.After(1 * time.Second):
case <-timer.C:
if int(atomic.LoadUint64(&connected)) >= needed {
cancel()
return
}
timer.Reset(time.Second)
}
}
}()
Expand Down
Loading

0 comments on commit a9085af

Please sign in to comment.