Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC RW2 to RW1 and new fields #10432

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
19 changes: 16 additions & 3 deletions integration/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ overrides:
func TestDistributorRemoteWrite2(t *testing.T) {
queryEnd := time.Now().Round(time.Second)
queryStart := queryEnd.Add(-1 * time.Hour)
queryStep := 10 * time.Minute

testCases := map[string]struct {
inRemoteWrite []*promRW2.Request
Expand Down Expand Up @@ -424,7 +425,7 @@ func TestDistributorRemoteWrite2(t *testing.T) {
"-distributor.ha-tracker.store": "consul",
"-distributor.ha-tracker.consul.hostname": consul.NetworkHTTPEndpoint(),
"-distributor.ha-tracker.prefix": "prom_ha/",
"-timeseries-unmarshal-caching-optimization-enabled": strconv.FormatBool(false),
"-timeseries-unmarshal-caching-optimization-enabled": strconv.FormatBool(false), // TODO(krajorama): add cachingUnmarshalDataEnabled testcase.
}

flags := mergeFlags(
Expand Down Expand Up @@ -490,10 +491,22 @@ func TestDistributorRemoteWrite2(t *testing.T) {

res, err := client.PushRW2(ser)
require.NoError(t, err)
require.Equal(t, http.StatusUnsupportedMediaType, res.StatusCode)
require.True(t, res.StatusCode == http.StatusOK || res.StatusCode == http.StatusAccepted, res.Status)
}

// Placeholder for actual query tests.
for q, res := range tc.queries {
result, err := client.QueryRange(q, queryStart, queryEnd, queryStep)
require.NoError(t, err)

require.Equal(t, res.String(), result.String())
}

for q, expResult := range tc.exemplarQueries {
result, err := client.QueryExemplars(q, queryStart, queryEnd)
require.NoError(t, err)

require.Equal(t, expResult, result)
}
})
}
}
13 changes: 8 additions & 5 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1584,7 +1584,7 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {
return err
}

d.updateReceivedMetrics(req, userID)
d.updateReceivedMetrics(ctx, req, userID)

if len(req.Timeseries) == 0 && len(req.Metadata) == 0 {
return nil
Expand Down Expand Up @@ -1815,17 +1815,20 @@ func tokenForMetadata(userID string, metricName string) uint32 {
return mimirpb.ShardByMetricName(userID, metricName)
}

func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string) {
var receivedSamples, receivedExemplars, receivedMetadata int
func (d *Distributor) updateReceivedMetrics(ctx context.Context, req *mimirpb.WriteRequest, userID string) {
var receivedSamples, receivedHistograms, receivedExemplars, receivedMetadata int
for _, ts := range req.Timeseries {
receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms)
receivedSamples += len(ts.TimeSeries.Samples)
receivedHistograms += len(ts.TimeSeries.Histograms)
receivedExemplars += len(ts.TimeSeries.Exemplars)
}
receivedMetadata = len(req.Metadata)

d.receivedSamples.WithLabelValues(userID).Add(float64(receivedSamples))
d.receivedSamples.WithLabelValues(userID).Add(float64(receivedSamples + receivedHistograms))
d.receivedExemplars.WithLabelValues(userID).Add(float64(receivedExemplars))
d.receivedMetadata.WithLabelValues(userID).Add(float64(receivedMetadata))

updateWriteResponseStatsCtx(ctx, receivedSamples, receivedHistograms, receivedExemplars)
}

// forReplicationSets runs f, in parallel, for all ingesters in the input replicationSets.
Expand Down
128 changes: 84 additions & 44 deletions pkg/distributor/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/grafana/dskit/user"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
promRemote "github.com/prometheus/prometheus/storage/remote"

"github.com/grafana/mimir/pkg/mimirpb"
"github.com/grafana/mimir/pkg/util"
Expand All @@ -36,6 +37,13 @@ import (
// PushFunc defines the type of the push. It is similar to http.HandlerFunc.
type PushFunc func(ctx context.Context, req *Request) error

// The PushFunc might store promRemote.WriteResponseStats in the context.
type pushResponseStatsContextMarker struct{}

var (
PushResponseStatsContextKey = &pushResponseStatsContextMarker{}
)

// parserFunc defines how to read the body the request from an HTTP request. It takes an optional RequestBuffers.
type parserFunc func(ctx context.Context, r *http.Request, maxSize int, buffers *util.RequestBuffers, req *mimirpb.PreallocWriteRequest, logger log.Logger) error

Expand Down Expand Up @@ -151,65 +159,68 @@ func handler(
}
}

var supplier supplierFunc
isRW2, err := isRemoteWrite2(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
}
if isRW2 {
supplier = func() (*mimirpb.WriteRequest, func(), error) {
// Return 415 Unsupported Media Type for remote-write v2 requests for now. This is not retryable
// unless the client switches to remote-write v1.
return nil, nil, httpgrpc.Error(http.StatusUnsupportedMediaType, "remote-write v2 is not supported")
}
} else {
supplier = func() (*mimirpb.WriteRequest, func(), error) {
rb := util.NewRequestBuffers(requestBufferPool)
var req mimirpb.PreallocWriteRequest
supplier := func() (*mimirpb.WriteRequest, func(), error) {
rb := util.NewRequestBuffers(requestBufferPool)
var req mimirpb.PreallocWriteRequest

userID, err := tenant.TenantID(ctx)
if err != nil && !errors.Is(err, user.ErrNoOrgID) { // ignore user.ErrNoOrgID
return nil, nil, errors.Wrap(err, "failed to get tenant ID")
}
req.UnmarshalFromRW2 = isRW2

// userID might be empty if none was in the ctx, in this case just use the default setting.
if limits.MaxGlobalExemplarsPerUser(userID) == 0 {
// The user is not allowed to send exemplars, so there is no need to unmarshal them.
// Optimization to avoid the allocations required for unmarshaling exemplars.
req.SkipUnmarshalingExemplars = true
}
userID, err := tenant.TenantID(ctx)
if err != nil && !errors.Is(err, user.ErrNoOrgID) { // ignore user.ErrNoOrgID
return nil, nil, errors.Wrap(err, "failed to get tenant ID")
}

if err := parser(ctx, r, maxRecvMsgSize, rb, &req, logger); err != nil {
// Check for httpgrpc error, default to client error if parsing failed
if _, ok := httpgrpc.HTTPResponseFromError(err); !ok {
err = httpgrpc.Error(http.StatusBadRequest, err.Error())
}
// userID might be empty if none was in the ctx, in this case just use the default setting.
if limits.MaxGlobalExemplarsPerUser(userID) == 0 {
// The user is not allowed to send exemplars, so there is no need to unmarshal them.
// Optimization to avoid the allocations required for unmarshaling exemplars.
req.SkipUnmarshalingExemplars = true
}

rb.CleanUp()
return nil, nil, err
if err := parser(ctx, r, maxRecvMsgSize, rb, &req, logger); err != nil {
// Check for httpgrpc error, default to client error if parsing failed
if _, ok := httpgrpc.HTTPResponseFromError(err); !ok {
err = httpgrpc.Error(http.StatusBadRequest, err.Error())
}

if allowSkipLabelNameValidation {
req.SkipLabelValidation = req.SkipLabelValidation && r.Header.Get(SkipLabelNameValidationHeader) == "true"
} else {
req.SkipLabelValidation = false
}
rb.CleanUp()
return nil, nil, err
}

if allowSkipLabelCountValidation {
req.SkipLabelCountValidation = req.SkipLabelCountValidation && r.Header.Get(SkipLabelCountValidationHeader) == "true"
} else {
req.SkipLabelCountValidation = false
}
if allowSkipLabelNameValidation {
req.SkipLabelValidation = req.SkipLabelValidation && r.Header.Get(SkipLabelNameValidationHeader) == "true"
} else {
req.SkipLabelValidation = false
}

cleanup := func() {
mimirpb.ReuseSlice(req.Timeseries)
rb.CleanUp()
}
return &req.WriteRequest, cleanup, nil
if allowSkipLabelCountValidation {
req.SkipLabelCountValidation = req.SkipLabelCountValidation && r.Header.Get(SkipLabelCountValidationHeader) == "true"
} else {
req.SkipLabelCountValidation = false
}

cleanup := func() {
mimirpb.ReuseSlice(req.Timeseries)
rb.CleanUp()
}
return &req.WriteRequest, cleanup, nil
}
req := newRequest(supplier)
if err := push(ctx, req); err != nil {
ctx = contextWithWriteResponseStats(ctx)
err = push(ctx, req)
rsValue := ctx.Value(PushResponseStatsContextKey)
if rsValue != nil {
rs := rsValue.(*promRemote.WriteResponseStats)
addWriteResponseStats(w, rs)
} else {
// This should not happen, but if it does, we should not panic.
addWriteResponseStats(w, &promRemote.WriteResponseStats{})
}
if err != nil {
if errors.Is(err, context.Canceled) {
http.Error(w, err.Error(), statusClientClosedRequest)
level.Warn(logger).Log("msg", "push request canceled", "err", err)
Expand Down Expand Up @@ -275,6 +286,35 @@ func isRemoteWrite2(r *http.Request) (bool, error) {
return false, nil
}

// Consts from https://github.com/prometheus/prometheus/blob/main/storage/remote/stats.go
const (
rw20WrittenSamplesHeader = "X-Prometheus-Remote-Write-Samples-Written"
rw20WrittenHistogramsHeader = "X-Prometheus-Remote-Write-Histograms-Written"
rw20WrittenExemplarsHeader = "X-Prometheus-Remote-Write-Exemplars-Written"
)

func contextWithWriteResponseStats(ctx context.Context) context.Context {
return context.WithValue(ctx, PushResponseStatsContextKey, &promRemote.WriteResponseStats{})
}

func addWriteResponseStats(w http.ResponseWriter, rs *promRemote.WriteResponseStats) {
headers := w.Header()
headers.Add(rw20WrittenSamplesHeader, strconv.Itoa(rs.Samples))
headers.Add(rw20WrittenHistogramsHeader, strconv.Itoa(rs.Histograms))
headers.Add(rw20WrittenExemplarsHeader, strconv.Itoa(rs.Exemplars))
}

func updateWriteResponseStatsCtx(ctx context.Context, samples, histograms, exemplars int) {
prs := ctx.Value(PushResponseStatsContextKey)
if prs == nil {
// Should not happen, but we should not panic anyway.
return
}
prs.(*promRemote.WriteResponseStats).Samples += samples
prs.(*promRemote.WriteResponseStats).Histograms += histograms
prs.(*promRemote.WriteResponseStats).Exemplars += exemplars
}

func calculateRetryAfter(retryAttemptHeader string, minBackoff, maxBackoff time.Duration) string {
const jitterFactor = 0.5

Expand Down
Loading
Loading