From d8222d58154c2742d845feabf5cd90ae9b3976f5 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 25 Jul 2024 14:37:04 -0400 Subject: [PATCH] fix(bitswap/client/providerquerymanager): don't end trace span until all providers are returned (cherry picked from commit 12cbf258332ea6726c233b029b7833a9eea2d9d4) --- .../internal/providerquerymanager/providerquerymanager.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/bitswap/client/internal/providerquerymanager/providerquerymanager.go b/bitswap/client/internal/providerquerymanager/providerquerymanager.go index e57726ddc..c85efe737 100644 --- a/bitswap/client/internal/providerquerymanager/providerquerymanager.go +++ b/bitswap/client/internal/providerquerymanager/providerquerymanager.go @@ -126,7 +126,6 @@ func (pqm *ProviderQueryManager) FindProvidersAsync(sessionCtx context.Context, var span trace.Span sessionCtx, span = internal.StartSpan(sessionCtx, "ProviderQueryManager.FindProvidersAsync", trace.WithAttributes(attribute.Stringer("cid", k))) - defer span.End() select { case pqm.providerQueryMessages <- &newProvideQueryMessage{ @@ -137,6 +136,7 @@ func (pqm *ProviderQueryManager) FindProvidersAsync(sessionCtx context.Context, case <-pqm.ctx.Done(): ch := make(chan peer.ID) close(ch) + span.End() return ch case <-sessionCtx.Done(): ch := make(chan peer.ID) @@ -152,14 +152,15 @@ func (pqm *ProviderQueryManager) FindProvidersAsync(sessionCtx context.Context, case <-pqm.ctx.Done(): ch := make(chan peer.ID) close(ch) + span.End() return ch case receivedInProgressRequest = <-inProgressRequestChan: } - return pqm.receiveProviders(sessionCtx, k, receivedInProgressRequest) + return pqm.receiveProviders(sessionCtx, k, receivedInProgressRequest, func() { span.End() }) } -func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k cid.Cid, receivedInProgressRequest inProgressRequest) <-chan peer.ID { +func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k cid.Cid, receivedInProgressRequest inProgressRequest, onCloseFn func()) <-chan peer.ID { // maintains an unbuffered queue for incoming providers for given request for a given session // essentially, as a provider comes in, for a given CID, we want to immediately broadcast to all // sessions that queried that CID, without worrying about whether the client code is actually @@ -171,6 +172,7 @@ func (pqm *ProviderQueryManager) receiveProviders(sessionCtx context.Context, k go func() { defer close(returnedProviders) + defer onCloseFn() outgoingProviders := func() chan<- peer.ID { if len(receivedProviders) == 0 { return nil