From 2528de73ce38b94c15e15a24a3b7416d752dedcc Mon Sep 17 00:00:00 2001 From: Paul Dittamo <37558497+pvditt@users.noreply.github.com> Date: Mon, 8 Apr 2024 15:46:50 -0700 Subject: [PATCH] add cache client read and write otel tracing (#5184) * add cache client read and write otel tracing Signed-off-by: Paul Dittamo * lint Signed-off-by: Paul Dittamo --------- Signed-off-by: Paul Dittamo --- flytepropeller/pkg/controller/nodes/cache.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/flytepropeller/pkg/controller/nodes/cache.go b/flytepropeller/pkg/controller/nodes/cache.go index 5d4c8455a5..59cf21057d 100644 --- a/flytepropeller/pkg/controller/nodes/cache.go +++ b/flytepropeller/pkg/controller/nodes/cache.go @@ -21,6 +21,7 @@ import ( "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces" "github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task" "github.com/flyteorg/flyte/flytestdlib/logger" + "github.com/flyteorg/flyte/flytestdlib/otelutils" "github.com/flyteorg/flyte/flytestdlib/storage" ) @@ -77,6 +78,8 @@ func updatePhaseCacheInfo(phaseInfo handler.PhaseInfo, cacheStatus *catalog.Stat // CheckCatalogCache uses the handler and contexts to check if cached outputs for the current node // exist. If the exist, this function also copies the outputs to this node. func (n *nodeExecutor) CheckCatalogCache(ctx context.Context, nCtx interfaces.NodeExecutionContext, cacheHandler interfaces.CacheableNodeHandler) (catalog.Entry, error) { + ctx, span := otelutils.NewSpan(ctx, otelutils.FlytePropellerTracer, "pkg.controller.nodes.NodeExecutor/CheckCatalogCache") + defer span.End() catalogKey, err := cacheHandler.GetCatalogKey(ctx, nCtx) if err != nil { return catalog.Entry{}, errors.Wrapf(err, "failed to initialize the catalogKey") @@ -197,6 +200,8 @@ func (n *nodeExecutor) ReleaseCatalogReservation(ctx context.Context, nCtx inter // WriteCatalogCache relays the outputs of this node to the cache. This allows future executions // to reuse these data to avoid recomputation. func (n *nodeExecutor) WriteCatalogCache(ctx context.Context, nCtx interfaces.NodeExecutionContext, cacheHandler interfaces.CacheableNodeHandler) (catalog.Status, error) { + ctx, span := otelutils.NewSpan(ctx, otelutils.FlytePropellerTracer, "pkg.controller.nodes.NodeExecutor/WriteCatalogCache") + defer span.End() catalogKey, err := cacheHandler.GetCatalogKey(ctx, nCtx) if err != nil { return catalog.NewStatus(core.CatalogCacheStatus_CACHE_DISABLED, nil), errors.Wrapf(err, "failed to initialize the catalogKey")