Skip to content

Commit

Permalink
fix(bitswap/client/providerquerymanager): don't end trace span until …
Browse files Browse the repository at this point in the history
…all providers are returned

(cherry picked from commit 12cbf25)
  • Loading branch information
aschmahmann authored and gammazero committed Nov 20, 2024
1 parent 13d0b32 commit d8222d5
Showing 1 changed file with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ func (pqm *ProviderQueryManager) FindProvidersAsync(sessionCtx context.Context,

var span trace.Span
sessionCtx, span = internal.StartSpan(sessionCtx, "ProviderQueryManager.FindProvidersAsync", trace.WithAttributes(attribute.Stringer("cid", k)))
defer span.End()

select {
case pqm.providerQueryMessages <- &newProvideQueryMessage{
Expand All @@ -137,6 +136,7 @@ func (pqm *ProviderQueryManager) FindProvidersAsync(sessionCtx context.Context,
case <-pqm.ctx.Done():
ch := make(chan peer.ID)
close(ch)
span.End()

Check warning on line 139 in bitswap/client/internal/providerquerymanager/providerquerymanager.go

View check run for this annotation

Codecov / codecov/patch

bitswap/client/internal/providerquerymanager/providerquerymanager.go#L139

Added line #L139 was not covered by tests
return ch
case <-sessionCtx.Done():
ch := make(chan peer.ID)
Expand All @@ -152,14 +152,15 @@ func (pqm *ProviderQueryManager) FindProvidersAsync(sessionCtx context.Context,
case <-pqm.ctx.Done():
ch := make(chan peer.ID)
close(ch)
span.End()
return ch
case receivedInProgressRequest = <-inProgressRequestChan:
}

return pqm.receiveProviders(sessionCtx, k, receivedInProgressRequest)
return pqm.receiveProviders(sessionCtx, k, receivedInProgressRequest, func() { span.End() })
}

func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k cid.Cid, receivedInProgressRequest inProgressRequest) <-chan peer.ID {
func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k cid.Cid, receivedInProgressRequest inProgressRequest, onCloseFn func()) <-chan peer.ID {
// maintains an unbuffered queue for incoming providers for given request for a given session
// essentially, as a provider comes in, for a given CID, we want to immediately broadcast to all
// sessions that queried that CID, without worrying about whether the client code is actually
Expand All @@ -171,6 +172,7 @@ func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k

go func() {
defer close(returnedProviders)
defer onCloseFn()
outgoingProviders := func() chan<- peer.ID {
if len(receivedProviders) == 0 {
return nil
Expand Down

0 comments on commit d8222d5

Please sign in to comment.