Skip to content

Commit

Permalink
ingester: Fix regression on usage of cortex_ingester_queried_chunks (#…
Browse files Browse the repository at this point in the history
…6398)

* ingester: Fix regression on usage of cortex_ingester_queried_chunks

Fixes: #6396

Signed-off-by: Charlie Le <[email protected]>

* update changelog

Signed-off-by: Charlie Le <[email protected]>

---------

Signed-off-by: Charlie Le <[email protected]>
  • Loading branch information
CharlieTLe authored Dec 4, 2024
1 parent 320e475 commit dea6dcf
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224
* [BUGFIX] Ruler: Allow rule evaluation to complete during shutdown. #6326
* [BUGFIX] Ring: update ring with new ip address when instance is lost, rejoins, but heartbeat is disabled #6271
* [BUGFIX] ingester: Fix regression on usage of cortex_ingester_queried_chunks #6398

## 1.18.1 2024-10-14

Expand Down
28 changes: 16 additions & 12 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1968,18 +1968,21 @@ func (i *Ingester) QueryStream(req *client.QueryRequest, stream client.Ingester_
numSamples := 0
numSeries := 0
totalDataBytes := 0
numSeries, numSamples, totalDataBytes, err = i.queryStreamChunks(ctx, db, int64(from), int64(through), matchers, shardMatcher, stream)
numChunks := 0
numSeries, numSamples, totalDataBytes, numChunks, err = i.queryStreamChunks(ctx, db, int64(from), int64(through), matchers, shardMatcher, stream)

if err != nil {
return err
}

i.metrics.queriedSeries.Observe(float64(numSeries))
i.metrics.queriedSamples.Observe(float64(numSamples))
level.Debug(spanlog).Log("series", numSeries, "samples", numSamples, "data_bytes", totalDataBytes)
i.metrics.queriedChunks.Observe(float64(numChunks))
level.Debug(spanlog).Log("series", numSeries, "samples", numSamples, "data_bytes", totalDataBytes, "chunks", numChunks)
spanlog.SetTag("series", numSeries)
spanlog.SetTag("samples", numSamples)
spanlog.SetTag("data_bytes", totalDataBytes)
spanlog.SetTag("chunks", numChunks)
return nil
}

Expand All @@ -1998,16 +2001,16 @@ func (i *Ingester) trackInflightQueryRequest() (func(), error) {
}

// queryStreamChunks streams metrics from a TSDB. This implements the client.IngesterServer interface
func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, sm *storepb.ShardMatcher, stream client.Ingester_QueryStreamServer) (numSeries, numSamples, totalBatchSizeBytes int, _ error) {
func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, through int64, matchers []*labels.Matcher, sm *storepb.ShardMatcher, stream client.Ingester_QueryStreamServer) (numSeries, numSamples, totalBatchSizeBytes, numChunks int, _ error) {
q, err := db.ChunkQuerier(from, through)
if err != nil {
return 0, 0, 0, err
return 0, 0, 0, 0, err
}
defer q.Close()

c, err := i.trackInflightQueryRequest()
if err != nil {
return 0, 0, 0, err
return 0, 0, 0, 0, err
}
hints := &storage.SelectHints{
Start: from,
Expand All @@ -2018,7 +2021,7 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
ss := q.Select(ctx, false, hints, matchers...)
c()
if ss.Err() != nil {
return 0, 0, 0, ss.Err()
return 0, 0, 0, 0, ss.Err()
}

chunkSeries := make([]client.TimeSeriesChunk, 0, queryStreamBatchSize)
Expand All @@ -2044,7 +2047,7 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
// It is not guaranteed that chunk returned by iterator is populated.
// For now just return error. We could also try to figure out how to read the chunk.
if meta.Chunk == nil {
return 0, 0, 0, errors.Errorf("unfilled chunk returned from TSDB chunk querier")
return 0, 0, 0, 0, errors.Errorf("unfilled chunk returned from TSDB chunk querier")
}

ch := client.Chunk{
Expand All @@ -2061,10 +2064,11 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
case chunkenc.EncFloatHistogram:
ch.Encoding = int32(encoding.PrometheusFloatHistogramChunk)
default:
return 0, 0, 0, errors.Errorf("unknown chunk encoding from TSDB chunk querier: %v", meta.Chunk.Encoding())
return 0, 0, 0, 0, errors.Errorf("unknown chunk encoding from TSDB chunk querier: %v", meta.Chunk.Encoding())
}

ts.Chunks = append(ts.Chunks, ch)
numChunks++
numSamples += meta.Chunk.NumSamples()
}
numSeries++
Expand All @@ -2078,7 +2082,7 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
Chunkseries: chunkSeries,
})
if err != nil {
return 0, 0, 0, err
return 0, 0, 0, 0, err
}

batchSizeBytes = 0
Expand All @@ -2091,7 +2095,7 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th

// Ensure no error occurred while iterating the series set.
if err := ss.Err(); err != nil {
return 0, 0, 0, err
return 0, 0, 0, 0, err
}

// Final flush any existing metrics
Expand All @@ -2100,11 +2104,11 @@ func (i *Ingester) queryStreamChunks(ctx context.Context, db *userTSDB, from, th
Chunkseries: chunkSeries,
})
if err != nil {
return 0, 0, 0, err
return 0, 0, 0, 0, err
}
}

return numSeries, numSamples, totalBatchSizeBytes, nil
return numSeries, numSamples, totalBatchSizeBytes, numChunks, nil
}

func (i *Ingester) getTSDB(userID string) *userTSDB {
Expand Down
20 changes: 19 additions & 1 deletion pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3087,7 +3087,8 @@ func TestIngester_QueryStreamManySamplesChunks(t *testing.T) {
// Create ingester.
cfg := defaultIngesterTestConfig(t)

i, err := prepareIngesterWithBlocksStorage(t, cfg, prometheus.NewRegistry())
reg := prometheus.NewRegistry()
i, err := prepareIngesterWithBlocksStorage(t, cfg, reg)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
Expand Down Expand Up @@ -3154,6 +3155,7 @@ func TestIngester_QueryStreamManySamplesChunks(t *testing.T) {
recvMsgs := 0
series := 0
totalSamples := 0
totalChunks := 0

for {
resp, err := s.Recv()
Expand All @@ -3174,6 +3176,7 @@ func TestIngester_QueryStreamManySamplesChunks(t *testing.T) {
require.NoError(t, err)
totalSamples += chk.NumSamples()
}
totalChunks += len(ts.Chunks)
}
}

Expand All @@ -3183,6 +3186,21 @@ func TestIngester_QueryStreamManySamplesChunks(t *testing.T) {
require.True(t, 2 <= recvMsgs && recvMsgs <= 3)
require.Equal(t, 3, series)
require.Equal(t, 100000+500000+samplesCount, totalSamples)
require.Equal(t, 13335, totalChunks)
require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(`
# HELP cortex_ingester_queried_chunks The total number of chunks returned from queries.
# TYPE cortex_ingester_queried_chunks histogram
cortex_ingester_queried_chunks_bucket{le="10"} 0
cortex_ingester_queried_chunks_bucket{le="80"} 0
cortex_ingester_queried_chunks_bucket{le="640"} 0
cortex_ingester_queried_chunks_bucket{le="5120"} 0
cortex_ingester_queried_chunks_bucket{le="40960"} 1
cortex_ingester_queried_chunks_bucket{le="327680"} 1
cortex_ingester_queried_chunks_bucket{le="2.62144e+06"} 1
cortex_ingester_queried_chunks_bucket{le="+Inf"} 1
cortex_ingester_queried_chunks_sum 13335
cortex_ingester_queried_chunks_count 1
`), `cortex_ingester_queried_chunks`))
}

func writeRequestSingleSeries(lbls labels.Labels, samples []cortexpb.Sample) *cortexpb.WriteRequest {
Expand Down

0 comments on commit dea6dcf

Please sign in to comment.