Skip to content

Commit

Permalink
Improve validation metrics for discarded samples and exemplars (corte…
Browse files Browse the repository at this point in the history
  • Loading branch information
yeya24 authored Sep 20, 2024
1 parent 0f848d8 commit d829d65
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 34 deletions.
16 changes: 13 additions & 3 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,13 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
d.validateMetrics.DiscardedSamples.WithLabelValues(
validation.DroppedByRelabelConfiguration,
userID,
).Add(float64(len(ts.Samples)))
).Add(float64(len(ts.Samples) + len(ts.Histograms)))

// all labels are gone, exemplars will be discarded
d.validateMetrics.DiscardedExemplars.WithLabelValues(
validation.DroppedByRelabelConfiguration,
userID,
).Add(float64(len(ts.Exemplars)))
continue
}
ts.Labels = cortexpb.FromLabelsToLabelAdapters(l)
Expand All @@ -892,11 +898,15 @@ func (d *Distributor) prepareSeriesKeys(ctx context.Context, req *cortexpb.Write
}

if len(ts.Labels) == 0 {
d.validateMetrics.DiscardedExemplars.WithLabelValues(
d.validateMetrics.DiscardedSamples.WithLabelValues(
validation.DroppedByUserConfigurationOverride,
userID,
).Add(float64(len(ts.Samples)))
).Add(float64(len(ts.Samples) + len(ts.Histograms)))

d.validateMetrics.DiscardedExemplars.WithLabelValues(
validation.DroppedByUserConfigurationOverride,
userID,
).Add(float64(len(ts.Exemplars)))
continue
}

Expand Down
100 changes: 69 additions & 31 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1490,6 +1490,7 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) {
expectedSeries labels.Labels
removeReplica bool
removeLabels []string
exemplars []cortexpb.Exemplar
}

cases := []testcase{
Expand Down Expand Up @@ -1536,6 +1537,20 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) {
{Name: "cluster", Value: "one"},
},
},
// No labels left.
{
removeReplica: true,
removeLabels: []string{"cluster"},
inputSeries: labels.Labels{
{Name: "cluster", Value: "one"},
{Name: "__replica__", Value: "two"},
},
expectedSeries: labels.Labels{},
exemplars: []cortexpb.Exemplar{
{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("test", "a")), Value: 1, TimestampMs: 0},
{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("test", "b")), Value: 1, TimestampMs: 0},
},
},
}

for _, tc := range cases {
Expand All @@ -1546,6 +1561,15 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) {
limits.DropLabels = tc.removeLabels
limits.AcceptHASamples = tc.removeReplica

expectedDiscardedSamples := 0
expectedDiscardedExemplars := 0
if tc.expectedSeries.Len() == 0 {
expectedDiscardedSamples = 1
expectedDiscardedExemplars = len(tc.exemplars)
// Allow series with no labels to ingest
limits.EnforceMetricName = false
}

ds, ingesters, _, _ := prepare(t, prepConfig{
numIngesters: 2,
happyIngesters: 2,
Expand All @@ -1556,14 +1580,24 @@ func TestDistributor_Push_LabelRemoval(t *testing.T) {

// Push the series to the distributor
req := mockWriteRequest([]labels.Labels{tc.inputSeries}, 1, 1, histogram)
req.Timeseries[0].Exemplars = tc.exemplars
_, err = ds[0].Push(ctx, req)
require.NoError(t, err)

actualDiscardedSamples := testutil.ToFloat64(ds[0].validateMetrics.DiscardedSamples.WithLabelValues(validation.DroppedByUserConfigurationOverride, "user"))
actualDiscardedExemplars := testutil.ToFloat64(ds[0].validateMetrics.DiscardedExemplars.WithLabelValues(validation.DroppedByUserConfigurationOverride, "user"))
require.Equal(t, float64(expectedDiscardedSamples), actualDiscardedSamples)
require.Equal(t, float64(expectedDiscardedExemplars), actualDiscardedExemplars)

// Since each test pushes only 1 series, we do expect the ingester
// to have received exactly 1 series
for i := range ingesters {
timeseries := ingesters[i].series()
assert.Equal(t, 1, len(timeseries))
expectedSeries := 1
if tc.expectedSeries.Len() == 0 {
expectedSeries = 0
}
assert.Equal(t, expectedSeries, len(timeseries))
for _, v := range timeseries {
assert.Equal(t, tc.expectedSeries, cortexpb.FromLabelAdaptersToLabels(v.Labels))
}
Expand Down Expand Up @@ -3777,39 +3811,43 @@ func TestDistributor_Push_RelabelDropWillExportMetricOfDroppedSamples(t *testing
flagext.DefaultValues(&limits)
limits.MetricRelabelConfigs = metricRelabelConfigs

ds, ingesters, regs, _ := prepare(t, prepConfig{
numIngesters: 2,
happyIngesters: 2,
numDistributors: 1,
shardByAllLabels: true,
limits: &limits,
})

// Push the series to the distributor
req := mockWriteRequest(inputSeries, 1, 1, false)
ctx := user.InjectOrgID(context.Background(), "userDistributorPushRelabelDropWillExportMetricOfDroppedSamples")
_, err = ds[0].Push(ctx, req)
require.NoError(t, err)
for _, histogramEnabled := range []bool{false, true} {
ds, ingesters, _, _ := prepare(t, prepConfig{
numIngesters: 2,
happyIngesters: 2,
numDistributors: 1,
shardByAllLabels: true,
limits: &limits,
})

// Since each test pushes only 1 series, we do expect the ingester
// to have received exactly 1 series
for i := range ingesters {
timeseries := ingesters[i].series()
assert.Equal(t, 1, len(timeseries))
}
// Push the series to the distributor
id := "user"
req := mockWriteRequest(inputSeries, 1, 1, histogramEnabled)
req.Timeseries[0].Exemplars = []cortexpb.Exemplar{
{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("test", "a")), Value: 1, TimestampMs: 0},
{Labels: cortexpb.FromLabelsToLabelAdapters(labels.FromStrings("test", "b")), Value: 1, TimestampMs: 0},
}
ctx := user.InjectOrgID(context.Background(), id)
_, err = ds[0].Push(ctx, req)
require.NoError(t, err)

metrics := []string{"cortex_distributor_received_samples_total", "cortex_discarded_samples_total"}
for i := range ingesters {
timeseries := ingesters[i].series()
assert.Equal(t, 1, len(timeseries))
}

expectedMetrics := `
# HELP cortex_discarded_samples_total The total number of samples that were discarded.
# TYPE cortex_discarded_samples_total counter
cortex_discarded_samples_total{reason="relabel_configuration",user="userDistributorPushRelabelDropWillExportMetricOfDroppedSamples"} 1
# HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples.
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{type="float",user="userDistributorPushRelabelDropWillExportMetricOfDroppedSamples"} 1
cortex_distributor_received_samples_total{type="histogram",user="userDistributorPushRelabelDropWillExportMetricOfDroppedSamples"} 0
`
require.NoError(t, testutil.GatherAndCompare(regs[0], strings.NewReader(expectedMetrics), metrics...))
require.Equal(t, testutil.ToFloat64(ds[0].validateMetrics.DiscardedSamples.WithLabelValues(validation.DroppedByRelabelConfiguration, id)), float64(1))
require.Equal(t, testutil.ToFloat64(ds[0].validateMetrics.DiscardedExemplars.WithLabelValues(validation.DroppedByRelabelConfiguration, id)), float64(2))
receivedFloatSamples := testutil.ToFloat64(ds[0].receivedSamples.WithLabelValues(id, "float"))
receivedHistogramSamples := testutil.ToFloat64(ds[0].receivedSamples.WithLabelValues(id, "histogram"))
if histogramEnabled {
require.Equal(t, receivedFloatSamples, float64(0))
require.Equal(t, receivedHistogramSamples, float64(1))
} else {
require.Equal(t, receivedFloatSamples, float64(1))
require.Equal(t, receivedHistogramSamples, float64(0))
}
}
}

func countMockIngestersCalls(ingesters []*mockIngester, name string) int {
Expand Down

0 comments on commit d829d65

Please sign in to comment.