From 2e5488af4fc582b6f63afa4ec0543f31aef161c5 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Wed, 6 Nov 2024 07:05:03 +0900 Subject: [PATCH] Change otlp attribute conversion to be consistent with prometheus (#6272) Signed-off-by: SungJin1212 --- CHANGELOG.md | 4 + docs/configuration/config-file-reference.md | 15 ++ integration/e2ecortex/client.go | 10 +- integration/otlp_test.go | 110 ++++++++++ pkg/api/api.go | 5 +- pkg/cortex/modules.go | 2 +- pkg/distributor/distributor.go | 11 + pkg/util/push/otlp.go | 72 +++++-- pkg/util/push/otlp_test.go | 226 +++++++++++++++++++- pkg/util/validation/limits.go | 7 + 10 files changed, 430 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 63cddfebb1..27e076137b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## master / unreleased +* [CHANGE] OTLP: Change OTLP handler to be consistent with the Prometheus OTLP handler. #6272 +- `target_info` metric is enabled by default and can be disabled via `-distributor.otlp.disable-target-info=true` flag +- Convert all attributes to labels is disabled by default and can be enabled via `-distributor.otlp.convert-all-attributes=true` flag +- You can specify the attributes converted to labels via `-distributor.promote-resource-attributes` flag. Supported only if `-distributor.otlp.convert-all-attributes=false` * [CHANGE] Change all max async concurrency default values `50` to `3` #6268 * [CHANGE] Change default value of `-blocks-storage.bucket-store.index-cache.multilevel.max-async-concurrency` from `50` to `3` #6265 * [CHANGE] Enable Compactor and Alertmanager in target all. #6204 diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 28b98a6d18..42da5e4a6f 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2674,6 +2674,16 @@ instance_limits: # unlimited. # CLI flag: -distributor.instance-limits.max-inflight-push-requests [max_inflight_push_requests: | default = 0] + +otlp: + # If true, all resource attributes are converted to labels. + # CLI flag: -distributor.otlp.convert-all-attributes + [convert_all_attributes: | default = false] + + # If true, a target_info metric is not ingested. (refer to: + # https://github.com/prometheus/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems) + # CLI flag: -distributor.otlp.disable-target-info + [disable_target_info: | default = false] ``` ### `etcd_config` @@ -3319,6 +3329,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s # CLI flag: -validation.max-native-histogram-buckets [max_native_histogram_buckets: | default = 0] +# Comma separated list of resource attributes that should be converted to +# labels. +# CLI flag: -distributor.promote-resource-attributes +[promote_resource_attributes: | default = ] + # The maximum number of active series per user, per ingester. 0 to disable. # CLI flag: -ingester.max-series-per-user [max_series_per_user: | default = 5000000] diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index 4ff4fa506a..60881d1008 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -231,7 +231,7 @@ func convertTimeseriesToMetrics(timeseries []prompb.TimeSeries) pmetric.Metrics return metrics } -func otlpWriteRequest(name string) pmetricotlp.ExportRequest { +func otlpWriteRequest(name string, labels ...prompb.Label) pmetricotlp.ExportRequest { d := pmetric.NewMetrics() // Generate One Counter, One Gauge, One Histogram, One Exponential-Histogram @@ -244,6 +244,9 @@ func otlpWriteRequest(name string) pmetricotlp.ExportRequest { resourceMetric.Resource().Attributes().PutStr("service.name", "test-service") resourceMetric.Resource().Attributes().PutStr("service.instance.id", "test-instance") resourceMetric.Resource().Attributes().PutStr("host.name", "test-host") + for _, label := range labels { + resourceMetric.Resource().Attributes().PutStr(label.Name, label.Value) + } scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty() @@ -258,7 +261,6 @@ func otlpWriteRequest(name string) pmetricotlp.ExportRequest { counterDataPoint := counterMetric.Sum().DataPoints().AppendEmpty() counterDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) counterDataPoint.SetDoubleValue(10.0) - counterDataPoint.Attributes().PutStr("foo.bar", "baz") counterExemplar := counterDataPoint.Exemplars().AppendEmpty() counterExemplar.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) @@ -269,8 +271,8 @@ func otlpWriteRequest(name string) pmetricotlp.ExportRequest { return pmetricotlp.NewExportRequestFromMetrics(d) } -func (c *Client) OTLPPushExemplar(name string) (*http.Response, error) { - data, err := otlpWriteRequest(name).MarshalProto() +func (c *Client) OTLPPushExemplar(name string, labels ...prompb.Label) (*http.Response, error) { + data, err := otlpWriteRequest(name, labels...).MarshalProto() if err != nil { return nil, err } diff --git a/integration/otlp_test.go b/integration/otlp_test.go index 463d4d24d5..bf3ceb36e3 100644 --- a/integration/otlp_test.go +++ b/integration/otlp_test.go @@ -4,6 +4,8 @@ package integration import ( + "bytes" + "context" "fmt" "math/rand" "path/filepath" @@ -15,6 +17,7 @@ import ( "github.com/prometheus/prometheus/tsdb/tsdbutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore/providers/s3" "github.com/cortexproject/cortex/integration/e2e" e2edb "github.com/cortexproject/cortex/integration/e2e/db" @@ -144,3 +147,110 @@ func TestOTLPIngestExemplar(t *testing.T) { require.NoError(t, err) require.Equal(t, 1, len(exemplars)) } + +func TestOTLPPromoteResourceAttributesPerTenant(t *testing.T) { + configFileName := "runtime-config.yaml" + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + minio := e2edb.NewMinio(9000, bucketName) + require.NoError(t, s.StartAndWaitReady(minio)) + + // Configure the blocks storage to frequently compact TSDB head + // and ship blocks to the storage. + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-auth.enabled": "true", + "-runtime-config.backend": "s3", + "-runtime-config.s3.access-key-id": e2edb.MinioAccessKey, + "-runtime-config.s3.secret-access-key": e2edb.MinioSecretKey, + "-runtime-config.s3.bucket-name": bucketName, + "-runtime-config.s3.endpoint": fmt.Sprintf("%s-minio-9000:9000", networkName), + "-runtime-config.s3.insecure": "true", + "-runtime-config.file": configFileName, + "-runtime-config.reload-period": "1s", + + // Distributor + "-distributor.otlp.convert-all-attributes": "false", + "-distributor.promote-resource-attributes": "attr1,attr2,attr3", + + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + "-alertmanager-storage.backend": "local", + "-alertmanager-storage.local.path": filepath.Join(e2e.ContainerSharedDir, "alertmanager_configs"), + }) + + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + client, err := s3.NewBucketWithConfig(nil, s3.Config{ + Endpoint: minio.HTTPEndpoint(), + Insecure: true, + Bucket: bucketName, + AccessKey: e2edb.MinioAccessKey, + SecretKey: e2edb.MinioSecretKey, + }, "runtime-config-test", nil) + + require.NoError(t, err) + + // update runtime config + newRuntimeConfig := []byte(`overrides: + user-1: + promote_resource_attributes: ["attr1"] + user-2: + promote_resource_attributes: ["attr1", "attr2"] +`) + require.NoError(t, client.Upload(context.Background(), configFileName, bytes.NewReader(newRuntimeConfig))) + time.Sleep(2 * time.Second) + + require.NoError(t, copyFileToSharedDir(s, "docs/configuration/single-process-config-blocks-local.yaml", cortexConfigFile)) + + // start cortex and assert runtime-config is loaded correctly + cortex := e2ecortex.NewSingleBinaryWithConfigFile("cortex", cortexConfigFile, flags, "", 9009, 9095) + require.NoError(t, s.StartAndWaitReady(cortex)) + + c1, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + c2, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-2") + require.NoError(t, err) + + c3, err := e2ecortex.NewClient(cortex.HTTPEndpoint(), cortex.HTTPEndpoint(), "", "", "user-3") + require.NoError(t, err) + + // Push some series to Cortex. + now := time.Now() + + labels := []prompb.Label{ + {Name: "service.name", Value: "test-service"}, + {Name: "attr1", Value: "value"}, + {Name: "attr2", Value: "value"}, + {Name: "attr3", Value: "value"}, + } + + res, err := c1.OTLPPushExemplar("series_1", labels...) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + res, err = c2.OTLPPushExemplar("series_1", labels...) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + res, err = c3.OTLPPushExemplar("series_1", labels...) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + labelSet1, err := c1.LabelNames(now.Add(-time.Minute*5), now, "series_1") + require.NoError(t, err) + require.Equal(t, labelSet1, []string{"__name__", "attr1", "instance", "job"}) + + labelSet2, err := c2.LabelNames(now.Add(-time.Minute*5), now, "series_1") + require.NoError(t, err) + require.Equal(t, labelSet2, []string{"__name__", "attr1", "attr2", "instance", "job"}) + + labelSet3, err := c3.LabelNames(now.Add(-time.Minute*5), now, "series_1") + require.NoError(t, err) + require.Equal(t, labelSet3, []string{"__name__", "attr1", "attr2", "attr3", "instance", "job"}) +} diff --git a/pkg/api/api.go b/pkg/api/api.go index 92b6e1f368..9de60b9bc4 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -40,6 +40,7 @@ import ( "github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/push" + "github.com/cortexproject/cortex/pkg/util/validation" ) // DistributorPushWrapper wraps around a push. It is similar to middleware.Interface. @@ -273,11 +274,11 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc) { } // RegisterDistributor registers the endpoints associated with the distributor. -func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config) { +func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, overrides *validation.Overrides) { distributorpb.RegisterDistributorServer(a.server.GRPC, d) a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") - a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") + a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST") a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status") a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/all_user_stats", "Usage Statistics") diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index b1450d30b4..9088d2059a 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -246,7 +246,7 @@ func (t *Cortex) initGrpcClientServices() (serv services.Service, err error) { } func (t *Cortex) initDistributor() (serv services.Service, err error) { - t.API.RegisterDistributor(t.Distributor, t.Cfg.Distributor) + t.API.RegisterDistributor(t.Distributor, t.Cfg.Distributor, t.Overrides) return nil, nil } diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 699035e672..8f6b97aa5d 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -164,6 +164,9 @@ type Config struct { // Limits for distributor InstanceLimits InstanceLimits `yaml:"instance_limits"` + + // OTLPConfig + OTLPConfig OTLPConfig `yaml:"otlp"` } type InstanceLimits struct { @@ -171,6 +174,11 @@ type InstanceLimits struct { MaxInflightPushRequests int `yaml:"max_inflight_push_requests"` } +type OTLPConfig struct { + ConvertAllAttributes bool `yaml:"convert_all_attributes"` + DisableTargetInfo bool `yaml:"disable_target_info"` +} + // RegisterFlags adds the flags required to config this to the given FlagSet func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.PoolConfig.RegisterFlags(f) @@ -188,6 +196,9 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.Float64Var(&cfg.InstanceLimits.MaxIngestionRate, "distributor.instance-limits.max-ingestion-rate", 0, "Max ingestion rate (samples/sec) that this distributor will accept. This limit is per-distributor, not per-tenant. Additional push requests will be rejected. Current ingestion rate is computed as exponentially weighted moving average, updated every second. 0 = unlimited.") f.IntVar(&cfg.InstanceLimits.MaxInflightPushRequests, "distributor.instance-limits.max-inflight-push-requests", 0, "Max inflight push requests that this distributor can handle. This limit is per-distributor, not per-tenant. Additional requests will be rejected. 0 = unlimited.") + + f.BoolVar(&cfg.OTLPConfig.ConvertAllAttributes, "distributor.otlp.convert-all-attributes", false, "If true, all resource attributes are converted to labels.") + f.BoolVar(&cfg.OTLPConfig.DisableTargetInfo, "distributor.otlp.disable-target-info", false, "If true, a target_info metric is not ingested. (refer to: https://github.com/prometheus/OpenMetrics/blob/main/specification/OpenMetrics.md#supporting-target-metadata-in-both-push-based-and-pull-based-systems)") } // Validate config and returns error on failure diff --git a/pkg/util/push/otlp.go b/pkg/util/push/otlp.go index 5bcca02732..e6e7fef6ce 100644 --- a/pkg/util/push/otlp.go +++ b/pkg/util/push/otlp.go @@ -1,55 +1,50 @@ package push import ( + "context" "net/http" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage/remote" "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite" + "github.com/prometheus/prometheus/util/annotations" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/middleware" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/distributor" + "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" - "github.com/cortexproject/cortex/pkg/util/log" + util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/cortexproject/cortex/pkg/util/validation" ) // OTLPHandler is a http.Handler which accepts OTLP metrics. -func OTLPHandler(sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler { +func OTLPHandler(overrides *validation.Overrides, cfg distributor.OTLPConfig, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - logger := log.WithContext(ctx, log.Logger) + logger := util_log.WithContext(ctx, util_log.Logger) if sourceIPs != nil { source := sourceIPs.Get(r) if source != "" { ctx = util.AddSourceIPsToOutgoingContext(ctx, source) - logger = log.WithSourceIPs(source, logger) + logger = util_log.WithSourceIPs(source, logger) } } - req, err := remote.DecodeOTLPWriteRequest(r) + + userID, err := tenant.TenantID(ctx) if err != nil { - level.Error(logger).Log("err", err.Error()) - http.Error(w, err.Error(), http.StatusBadRequest) return } - promConverter := prometheusremotewrite.NewPrometheusConverter() - setting := prometheusremotewrite.Settings{ - AddMetricSuffixes: true, - DisableTargetInfo: true, - } - annots, err := promConverter.FromMetrics(ctx, convertToMetricsAttributes(req.Metrics()), setting) - ws, _ := annots.AsStrings("", 0, 0) - if len(ws) > 0 { - level.Warn(logger).Log("msg", "Warnings translating OTLP metrics to Prometheus write request", "warnings", ws) - } - + req, err := remote.DecodeOTLPWriteRequest(r) if err != nil { - level.Error(logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err) + level.Error(logger).Log("err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) return } @@ -60,8 +55,16 @@ func OTLPHandler(sourceIPs *middleware.SourceIPExtractor, push Func) http.Handle SkipLabelNameValidation: false, } + // otlp to prompb TimeSeries + promTsList, err := convertToPromTS(r.Context(), req.Metrics(), cfg, overrides, userID, logger) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + // convert prompb to cortexpb TimeSeries tsList := []cortexpb.PreallocTimeseries(nil) - for _, v := range promConverter.TimeSeries() { + for _, v := range promTsList { tsList = append(tsList, cortexpb.PreallocTimeseries{TimeSeries: &cortexpb.TimeSeries{ Labels: makeLabels(v.Labels), Samples: makeSamples(v.Samples), @@ -87,6 +90,35 @@ func OTLPHandler(sourceIPs *middleware.SourceIPExtractor, push Func) http.Handle }) } +func convertToPromTS(ctx context.Context, pmetrics pmetric.Metrics, cfg distributor.OTLPConfig, overrides *validation.Overrides, userID string, logger log.Logger) ([]prompb.TimeSeries, error) { + promConverter := prometheusremotewrite.NewPrometheusConverter() + settings := prometheusremotewrite.Settings{ + AddMetricSuffixes: true, + DisableTargetInfo: cfg.DisableTargetInfo, + } + + var annots annotations.Annotations + var err error + + if cfg.ConvertAllAttributes { + annots, err = promConverter.FromMetrics(ctx, convertToMetricsAttributes(pmetrics), settings) + } else { + settings.PromoteResourceAttributes = overrides.PromoteResourceAttributes(userID) + annots, err = promConverter.FromMetrics(ctx, pmetrics, settings) + } + + ws, _ := annots.AsStrings("", 0, 0) + if len(ws) > 0 { + level.Warn(logger).Log("msg", "Warnings translating OTLP metrics to Prometheus write request", "warnings", ws) + } + + if err != nil { + level.Error(logger).Log("msg", "Error translating OTLP metrics to Prometheus write request", "err", err) + return nil, err + } + return promConverter.TimeSeries(), nil +} + func makeLabels(in []prompb.Label) []cortexpb.LabelAdapter { out := make(labels.Labels, 0, len(in)) for _, l := range in { diff --git a/pkg/util/push/otlp_test.go b/pkg/util/push/otlp_test.go index 33e0dd3d32..40b42f3fee 100644 --- a/pkg/util/push/otlp_test.go +++ b/pkg/util/push/otlp_test.go @@ -8,28 +8,239 @@ import ( "testing" "time" + "github.com/go-kit/log" + "github.com/prometheus/prometheus/prompb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/distributor" + "github.com/cortexproject/cortex/pkg/querier" + "github.com/cortexproject/cortex/pkg/util/validation" ) +func TestOTLPConvertToPromTS(t *testing.T) { + logger := log.NewNopLogger() + ctx := context.Background() + d := pmetric.NewMetrics() + resourceMetric := d.ResourceMetrics().AppendEmpty() + resourceMetric.Resource().Attributes().PutStr("service.name", "test-service") // converted to job, service_name + resourceMetric.Resource().Attributes().PutStr("attr1", "value") + resourceMetric.Resource().Attributes().PutStr("attr2", "value") + resourceMetric.Resource().Attributes().PutStr("attr3", "value") + + scopeMetric := resourceMetric.ScopeMetrics().AppendEmpty() + + //Generate One Counter + timestamp := time.Now() + counterMetric := scopeMetric.Metrics().AppendEmpty() + counterMetric.SetName("test-counter") + counterMetric.SetDescription("test-counter-description") + counterMetric.SetEmptySum() + counterMetric.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative) + counterMetric.Sum().SetIsMonotonic(true) + + counterDataPoint := counterMetric.Sum().DataPoints().AppendEmpty() + counterDataPoint.SetTimestamp(pcommon.NewTimestampFromTime(timestamp)) + counterDataPoint.SetDoubleValue(10.0) + + tests := []struct { + description string + PromoteResourceAttributes []string + cfg distributor.OTLPConfig + expectedLabels []prompb.Label + }{ + { + description: "target_info should be generated and an attribute that exist in promote resource attributes should be converted", + PromoteResourceAttributes: []string{"attr1"}, + cfg: distributor.OTLPConfig{ + ConvertAllAttributes: false, + DisableTargetInfo: false, + }, + expectedLabels: []prompb.Label{ + { + Name: "__name__", + Value: "test_counter_total", + }, + { + Name: "attr1", + Value: "value", + }, + { + Name: "job", + Value: "test-service", + }, + }, + }, + { + description: "an attributes that exist in promote resource attributes should be converted", + PromoteResourceAttributes: []string{"attr1"}, + cfg: distributor.OTLPConfig{ + ConvertAllAttributes: false, + DisableTargetInfo: true, + }, + expectedLabels: []prompb.Label{ + { + Name: "__name__", + Value: "test_counter_total", + }, + { + Name: "attr1", + Value: "value", + }, + { + Name: "job", + Value: "test-service", + }, + }, + }, + { + description: "not exist attribute is ignored", + PromoteResourceAttributes: []string{"dummy"}, + cfg: distributor.OTLPConfig{ + ConvertAllAttributes: false, + DisableTargetInfo: true, + }, + expectedLabels: []prompb.Label{ + { + Name: "__name__", + Value: "test_counter_total", + }, + { + Name: "job", + Value: "test-service", + }, + }, + }, + { + description: "should convert all attribute", + PromoteResourceAttributes: nil, + cfg: distributor.OTLPConfig{ + ConvertAllAttributes: true, + DisableTargetInfo: true, + }, + expectedLabels: []prompb.Label{ + { + Name: "__name__", + Value: "test_counter_total", + }, + { + Name: "attr1", + Value: "value", + }, + { + Name: "attr2", + Value: "value", + }, + { + Name: "attr3", + Value: "value", + }, + { + Name: "job", + Value: "test-service", + }, + { + Name: "service_name", + Value: "test-service", + }, + }, + }, + { + description: "should convert all attribute regardless of promote resource attributes", + PromoteResourceAttributes: []string{"attr1", "attr2"}, + cfg: distributor.OTLPConfig{ + ConvertAllAttributes: true, + DisableTargetInfo: true, + }, + expectedLabels: []prompb.Label{ + { + Name: "__name__", + Value: "test_counter_total", + }, + { + Name: "attr1", + Value: "value", + }, + { + Name: "attr2", + Value: "value", + }, + { + Name: "attr3", + Value: "value", + }, + { + Name: "job", + Value: "test-service", + }, + { + Name: "service_name", + Value: "test-service", + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + limits := validation.Limits{ + PromoteResourceAttributes: test.PromoteResourceAttributes, + } + overrides, err := validation.NewOverrides(limits, nil) + require.NoError(t, err) + tsList, err := convertToPromTS(ctx, d, test.cfg, overrides, "user-1", logger) + require.NoError(t, err) + + if test.cfg.DisableTargetInfo { + require.Equal(t, 1, len(tsList)) // test_counter_total + } else { + // target_info should exist + require.Equal(t, 2, len(tsList)) // test_counter_total + target_info + } + + var counterTs prompb.TimeSeries + for _, ts := range tsList { + for _, label := range ts.Labels { + if label.Name == "__name__" && label.Value == "test_counter_total" { + // get counter ts + counterTs = ts + } + } + } + + require.ElementsMatch(t, test.expectedLabels, counterTs.Labels) + }) + } +} + func TestOTLPWriteHandler(t *testing.T) { + cfg := distributor.OTLPConfig{ + ConvertAllAttributes: false, + DisableTargetInfo: false, + } + exportRequest := generateOTLPWriteRequest(t) t.Run("Test proto format write", func(t *testing.T) { buf, err := exportRequest.MarshalProto() require.NoError(t, err) - req, err := http.NewRequest("", "", bytes.NewReader(buf)) + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "user-1") + + req, err := http.NewRequestWithContext(ctx, "", "", bytes.NewReader(buf)) require.NoError(t, err) req.Header.Set("Content-Type", "application/x-protobuf") push := verifyOTLPWriteRequestHandler(t, cortexpb.API) - handler := OTLPHandler(nil, push) + overrides, err := validation.NewOverrides(querier.DefaultLimitsConfig(), nil) + require.NoError(t, err) + handler := OTLPHandler(overrides, cfg, nil, push) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -41,12 +252,17 @@ func TestOTLPWriteHandler(t *testing.T) { buf, err := exportRequest.MarshalJSON() require.NoError(t, err) - req, err := http.NewRequest("", "", bytes.NewReader(buf)) + ctx := context.Background() + ctx = user.InjectOrgID(ctx, "user-1") + + req, err := http.NewRequestWithContext(ctx, "", "", bytes.NewReader(buf)) require.NoError(t, err) req.Header.Set("Content-Type", "application/json") push := verifyOTLPWriteRequestHandler(t, cortexpb.API) - handler := OTLPHandler(nil, push) + overrides, err := validation.NewOverrides(querier.DefaultLimitsConfig(), nil) + require.NoError(t, err) + handler := OTLPHandler(overrides, cfg, nil, push) recorder := httptest.NewRecorder() handler.ServeHTTP(recorder, req) @@ -139,7 +355,7 @@ func generateOTLPWriteRequest(t *testing.T) pmetricotlp.ExportRequest { func verifyOTLPWriteRequestHandler(t *testing.T, expectSource cortexpb.WriteRequest_SourceEnum) func(ctx context.Context, request *cortexpb.WriteRequest) (response *cortexpb.WriteResponse, err error) { t.Helper() return func(ctx context.Context, request *cortexpb.WriteRequest) (response *cortexpb.WriteResponse, err error) { - assert.Len(t, request.Timeseries, 12) // 1 (counter) + 1 (gauge) + 7 (hist_bucket) + 2 (hist_sum, hist_count) + 1 (exponential histogram) + assert.Len(t, request.Timeseries, 13) // 1 (target_info) + 1 (counter) + 1 (gauge) + 7 (hist_bucket) + 2 (hist_sum, hist_count) + 1 (exponential histogram) // TODO: test more things assert.Equal(t, expectSource, request.Source) assert.False(t, request.SkipLabelNameValidation) diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 87173abb13..d63d02908f 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -136,6 +136,7 @@ type Limits struct { IngestionTenantShardSize int `yaml:"ingestion_tenant_shard_size" json:"ingestion_tenant_shard_size"` MetricRelabelConfigs []*relabel.Config `yaml:"metric_relabel_configs,omitempty" json:"metric_relabel_configs,omitempty" doc:"nocli|description=List of metric relabel configurations. Note that in most situations, it is more effective to use metrics relabeling directly in the Prometheus server, e.g. remote_write.write_relabel_configs."` MaxNativeHistogramBuckets int `yaml:"max_native_histogram_buckets" json:"max_native_histogram_buckets"` + PromoteResourceAttributes []string `yaml:"promote_resource_attributes" json:"promote_resource_attributes"` // Ingester enforced limits. // Series @@ -223,6 +224,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.StringVar(&l.HAClusterLabel, "distributor.ha-tracker.cluster", "cluster", "Prometheus label to look for in samples to identify a Prometheus HA cluster.") f.StringVar(&l.HAReplicaLabel, "distributor.ha-tracker.replica", "__replica__", "Prometheus label to look for in samples to identify a Prometheus HA replica.") f.IntVar(&l.HAMaxClusters, "distributor.ha-tracker.max-clusters", 0, "Maximum number of clusters that HA tracker will keep track of for single user. 0 to disable the limit.") + f.Var((*flagext.StringSliceCSV)(&l.PromoteResourceAttributes), "distributor.promote-resource-attributes", "Comma separated list of resource attributes that should be converted to labels.") f.Var(&l.DropLabels, "distributor.drop-label", "This flag can be used to specify label names that to drop during sample ingestion within the distributor and can be repeated in order to drop multiple labels.") f.IntVar(&l.MaxLabelNameLength, "validation.max-length-label-name", 1024, "Maximum length accepted for label names") f.IntVar(&l.MaxLabelValueLength, "validation.max-length-label-value", 2048, "Maximum length accepted for label value. This setting also applies to the metric name") @@ -754,6 +756,11 @@ func (o *Overrides) MaxGlobalMetadataPerMetric(userID string) int { return o.GetOverridesForUser(userID).MaxGlobalMetadataPerMetric } +// PromoteResourceAttributes returns the promote resource attributes for a given user. +func (o *Overrides) PromoteResourceAttributes(userID string) []string { + return o.GetOverridesForUser(userID).PromoteResourceAttributes +} + // IngestionTenantShardSize returns the ingesters shard size for a given user. func (o *Overrides) IngestionTenantShardSize(userID string) int { return o.GetOverridesForUser(userID).IngestionTenantShardSize