Skip to content

Commit

Permalink
POC convert RW2 to RW1
Browse files Browse the repository at this point in the history
Signed-off-by: György Krajcsovits <[email protected]>
  • Loading branch information
krajorama committed Jan 14, 2025
1 parent 67acabc commit f0b12f6
Show file tree
Hide file tree
Showing 8 changed files with 2,724 additions and 293 deletions.
140 changes: 140 additions & 0 deletions integration/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
promRW2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/stretchr/testify/require"

"github.com/grafana/mimir/integration/e2emimir"
"github.com/grafana/mimir/pkg/distributor/rw2"
)

func TestDistributor(t *testing.T) {
Expand Down Expand Up @@ -370,3 +372,141 @@ overrides:
})
}
}

func TestDistributorRemoteWrite2(t *testing.T) {
queryEnd := time.Now().Round(time.Second)
queryStart := queryEnd.Add(-1 * time.Hour)
queryStep := 10 * time.Minute

testCases := map[string]struct {
inRemoteWrite []*promRW2.Request
runtimeConfig string
queries map[string]model.Matrix
exemplarQueries map[string][]promv1.ExemplarQueryResult
}{
"no special features": {
inRemoteWrite: []*promRW2.Request{
rw2.AddFloatSeries(
nil,
labels.FromStrings("__name__", "foobar"),
[]promRW2.Sample{{Timestamp: queryStart.UnixMilli(), Value: 100}},
promRW2.Metadata_METRIC_TYPE_COUNTER,
"some help",
"someunit",
0,
nil),
},
queries: map[string]model.Matrix{
"foobar": {{
Metric: model.Metric{"__name__": "foobar"},
Values: []model.SamplePair{{Timestamp: model.Time(queryStart.UnixMilli()), Value: model.SampleValue(100)}},
}},
},
},
}

s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

previousRuntimeConfig := ""
require.NoError(t, writeFileToSharedDir(s, "runtime.yaml", []byte(previousRuntimeConfig)))

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, blocksBucketName)
require.NoError(t, s.StartAndWaitReady(consul, minio))

baseFlags := map[string]string{
"-distributor.ingestion-tenant-shard-size": "0",
"-ingester.ring.heartbeat-period": "1s",
"-distributor.ha-tracker.enable": "true",
"-distributor.ha-tracker.enable-for-all-users": "true",
"-distributor.ha-tracker.store": "consul",
"-distributor.ha-tracker.consul.hostname": consul.NetworkHTTPEndpoint(),
"-distributor.ha-tracker.prefix": "prom_ha/",
"-timeseries-unmarshal-caching-optimization-enabled": strconv.FormatBool(false), //cachingUnmarshalDataEnabled),
}

flags := mergeFlags(
BlocksStorageFlags(),
BlocksStorageS3Flags(),
baseFlags,
)

// We want only distributor to be reloading runtime config.
distributorFlags := mergeFlags(flags, map[string]string{
"-runtime-config.file": filepath.Join(e2e.ContainerSharedDir, "runtime.yaml"),
"-runtime-config.reload-period": "100ms",
// Set non-zero default for number of exemplars. That way our values used in the test (0 and 100) will show up in runtime config diff.
"-ingester.max-global-exemplars-per-user": "3",
})

// Ingester will not reload runtime config.
ingesterFlags := mergeFlags(flags, map[string]string{
// Ingester will always see exemplars enabled. We do this to avoid waiting for ingester to apply new setting to TSDB.
"-ingester.max-global-exemplars-per-user": "100",
})

// Start Mimir components.
distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), distributorFlags)
ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), ingesterFlags)
querier := e2emimir.NewQuerier("querier", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(distributor, ingester, querier))

// Wait until distributor has updated the ring.
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

// Wait until querier has updated the ring.
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

client, err := e2emimir.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", userID)
require.NoError(t, err)

runtimeConfigURL := fmt.Sprintf("http://%s/runtime_config?mode=diff", distributor.HTTPEndpoint())

for testName, tc := range testCases {
t.Run(testName, func(t *testing.T) {
for _, ser := range tc.inRemoteWrite {
if tc.runtimeConfig != previousRuntimeConfig {
currentRuntimeConfig, err := getURL(runtimeConfigURL)
require.NoError(t, err)

// Write new runtime config
require.NoError(t, writeFileToSharedDir(s, "runtime.yaml", []byte(tc.runtimeConfig)))

// Wait until distributor has reloaded runtime config.
test.Poll(t, 1*time.Second, true, func() interface{} {
newRuntimeConfig, err := getURL(runtimeConfigURL)
require.NoError(t, err)
return currentRuntimeConfig != newRuntimeConfig
})

previousRuntimeConfig = tc.runtimeConfig
}

res, err := client.PushRW2(ser)
require.Error(t, err)
require.True(t, res.StatusCode == http.StatusOK || res.StatusCode == http.StatusAccepted, res.Status)
}

for q, res := range tc.queries {
result, err := client.QueryRange(q, queryStart, queryEnd, queryStep)
require.NoError(t, err)

require.Equal(t, res.String(), result.String())
}

for q, expResult := range tc.exemplarQueries {
result, err := client.QueryExemplars(q, queryStart, queryEnd)
require.NoError(t, err)

require.Equal(t, expResult, result)
}
})
}
}
33 changes: 33 additions & 0 deletions integration/e2emimir/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
promConfig "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/rulefmt"
"github.com/prometheus/prometheus/prompb" // OTLP protos are not compatible with gogo
promRW2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/storage/remote"
yaml "gopkg.in/yaml.v3"

Expand Down Expand Up @@ -186,6 +187,38 @@ func (c *Client) Push(timeseries []prompb.TimeSeries) (*http.Response, error) {
return res, nil
}

func (c *Client) PushRW2(writeRequest *promRW2.Request) (*http.Response, error) {
// Create write request
data, err := proto.Marshal(writeRequest)
if err != nil {
return nil, err
}

// Create HTTP request
compressed := snappy.Encode(nil, data)
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/v1/push", c.distributorAddress), bytes.NewReader(compressed))
if err != nil {
return nil, err
}

req.Header.Add("Content-Encoding", "snappy")
req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request")
req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0")
req.Header.Set("X-Scope-OrgID", c.orgID)

ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

// Execute HTTP request
res, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}

defer res.Body.Close()
return res, nil
}

// PushOTLP the input timeseries to the remote endpoint in OTLP format
func (c *Client) PushOTLP(timeseries []prompb.TimeSeries, metadata []mimirpb.MetricMetadata) (*http.Response, error) {
// Create write request
Expand Down
13 changes: 8 additions & 5 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1588,7 +1588,7 @@ func (d *Distributor) push(ctx context.Context, pushReq *Request) error {
return err
}

d.updateReceivedMetrics(req, userID)
d.updateReceivedMetrics(ctx, req, userID)

if len(req.Timeseries) == 0 && len(req.Metadata) == 0 {
return nil
Expand Down Expand Up @@ -1819,17 +1819,20 @@ func tokenForMetadata(userID string, metricName string) uint32 {
return mimirpb.ShardByMetricName(userID, metricName)
}

func (d *Distributor) updateReceivedMetrics(req *mimirpb.WriteRequest, userID string) {
var receivedSamples, receivedExemplars, receivedMetadata int
func (d *Distributor) updateReceivedMetrics(ctx context.Context, req *mimirpb.WriteRequest, userID string) {
var receivedSamples, receivedHistograms, receivedExemplars, receivedMetadata int
for _, ts := range req.Timeseries {
receivedSamples += len(ts.TimeSeries.Samples) + len(ts.TimeSeries.Histograms)
receivedSamples += len(ts.TimeSeries.Samples)
receivedHistograms += len(ts.TimeSeries.Histograms)
receivedExemplars += len(ts.TimeSeries.Exemplars)
}
receivedMetadata = len(req.Metadata)

d.receivedSamples.WithLabelValues(userID).Add(float64(receivedSamples))
d.receivedSamples.WithLabelValues(userID).Add(float64(receivedSamples + receivedHistograms))
d.receivedExemplars.WithLabelValues(userID).Add(float64(receivedExemplars))
d.receivedMetadata.WithLabelValues(userID).Add(float64(receivedMetadata))

updateWriteResponseStatsCtx(ctx, receivedSamples, receivedHistograms, receivedExemplars)
}

// forReplicationSets runs f, in parallel, for all ingesters in the input replicationSets.
Expand Down
Loading

0 comments on commit f0b12f6

Please sign in to comment.