Skip to content

Commit

Permalink
Some logging improvements for LLO
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Nov 11, 2024
1 parent 340a6bf commit 6ee0267
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
33 changes: 23 additions & 10 deletions core/services/llo/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"sort"
"sync"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -100,6 +101,8 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,
d.lggr.Debugw("Observing streams", "streamIDs", streamIDs, "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr)
}

now := time.Now()

for _, streamID := range maps.Keys(streamValues) {
go func(streamID llotypes.StreamID) {
defer wg.Done()
Expand Down Expand Up @@ -147,26 +150,36 @@ func (d *dataSource) Observe(ctx context.Context, streamValues llo.StreamValues,

wg.Wait()

elapsed := time.Since(now)

// Failed observations are always logged at warn level
var successfulStreamIDs []streams.StreamID
var failedStreamIDs []streams.StreamID
if len(errs) > 0 {
var errStrs []string

// Only log errors or if VerboseLogging is turned on
if len(errs) > 0 || opts.VerboseLogging() {
successfulStreamIDs = make([]streams.StreamID, 0, len(streamValues))
for strmID := range streamValues {
successfulStreamIDs = append(successfulStreamIDs, strmID)
}
sort.Slice(successfulStreamIDs, func(i, j int) bool { return successfulStreamIDs[i] < successfulStreamIDs[j] })

sort.Slice(errs, func(i, j int) bool { return errs[i].streamID < errs[j].streamID })
failedStreamIDs = make([]streams.StreamID, len(errs))
errStrs := make([]string, len(errs))
errStrs = make([]string, len(errs))
for i, e := range errs {
errStrs[i] = e.String()
failedStreamIDs[i] = e.streamID
}
d.lggr.Warnw("Observation failed for streams", "failedStreamIDs", failedStreamIDs, "errs", errStrs, "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr)
}

if opts.VerboseLogging() {
successes := make([]streams.StreamID, 0, len(streamValues))
for strmID := range streamValues {
successes = append(successes, strmID)
lggr := logger.With(d.lggr, "elapsed", elapsed, "nSuccessfulStreams", len(successfulStreamIDs), "nFailedStreams", len(failedStreamIDs), "successfulStreamIDs", successfulStreamIDs, "failedStreamIDs", failedStreamIDs, "errs", errStrs, "configDigest", opts.ConfigDigest(), "seqNr", opts.OutCtx().SeqNr)

if len(errs) == 0 && opts.VerboseLogging() {
lggr.Infow("Observation succeeded for all streams")
} else if len(errs) > 0 {
lggr.Warnw("Observation failed for streams")
}
sort.Slice(successes, func(i, j int) bool { return successes[i] < successes[j] })
d.lggr.Debugw("Observation complete", "successfulStreamIDs", successes, "failedStreamIDs", failedStreamIDs, "configDigest", opts.ConfigDigest(), "values", streamValues, "seqNr", opts.OutCtx().SeqNr)
}

return nil
Expand Down
1 change: 0 additions & 1 deletion core/services/ocrcommon/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ func ParseMercuryEATelemetry(lggr logger.Logger, trrs pipeline.TaskRunResults, f

bridgeRawResponse, ok := trr.Result.Value.(string)
if !ok {
lggr.Warnw(fmt.Sprintf("cannot get bridge response from bridge task, id=%s, name=%q, expected string got %T", trr.Task.DotID(), bridgeName, trr.Result.Value), "dotID", trr.Task.DotID(), "bridgeName", bridgeName)
continue
}
eaTelem, err := parseEATelemetry([]byte(bridgeRawResponse))
Expand Down

0 comments on commit 6ee0267

Please sign in to comment.