Skip to content

Commit

Permalink
[receiver/prometheusremotewrite] Parse labels into resource and metri…
Browse files Browse the repository at this point in the history
…c attributes

Signed-off-by: Arthur Silva Sens <[email protected]>
  • Loading branch information
ArthurSens committed Nov 7, 2024
1 parent ba20b05 commit f862fe3
Show file tree
Hide file tree
Showing 4 changed files with 258 additions and 9 deletions.
27 changes: 27 additions & 0 deletions .chloggen/prwreceiver-parselabels.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: receiver/prometheusremotewrite

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Parse labels from Prometheus Remote Write requests into Resource and Scope Attributes

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35656]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Warning - The HTTP Server still doesn't pass metrics to the next consumer. The component is unusable for now.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api, user]
8 changes: 8 additions & 0 deletions receiver/prometheusremotewritereceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22.0
require (
github.com/gogo/protobuf v1.3.2
github.com/golang/snappy v0.0.4
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.112.0
github.com/prometheus/prometheus v0.54.1
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.112.0
Expand Down Expand Up @@ -54,6 +55,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.112.0 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
Expand Down Expand Up @@ -103,3 +105,9 @@ require (
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden
117 changes: 109 additions & 8 deletions receiver/prometheusremotewritereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,24 @@ import (

"github.com/gogo/protobuf/proto"
promconfig "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
promremote "github.com/prometheus/prometheus/storage/remote"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap/zapcore"
)

func newRemoteWriteReceiver(settings receiver.Settings, cfg *Config, nextConsumer consumer.Metrics) (receiver.Metrics, error) {
return &prometheusRemoteWriteReceiver{
settings: settings,
nextConsumer: nextConsumer,
config: cfg,
settings: settings,
nextConsumer: nextConsumer,
config: cfg,
jobInstanceCache: make(map[string]pmetric.ResourceMetrics),
server: &http.Server{
ReadTimeout: 60 * time.Second,
},
Expand All @@ -39,8 +42,9 @@ type prometheusRemoteWriteReceiver struct {
settings receiver.Settings
nextConsumer consumer.Metrics

config *Config
server *http.Server
jobInstanceCache map[string]pmetric.ResourceMetrics
config *Config
server *http.Server
}

func (prw *prometheusRemoteWriteReceiver) Start(ctx context.Context, host component.Host) error {
Expand Down Expand Up @@ -150,8 +154,105 @@ func (prw *prometheusRemoteWriteReceiver) parseProto(contentType string) (promco
}

// translateV2 translates a v2 remote-write request into OTLP metrics.
// For now translateV2 is not implemented and returns an empty metrics.
// translate is not feature complete.
// nolint
func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, _ *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) {
return pmetric.NewMetrics(), promremote.WriteResponseStats{}, nil
func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) {
var (
badRequestErrors []error
otelMetrics = pmetric.NewMetrics()
b = labels.NewScratchBuilder(0)
stats = promremote.WriteResponseStats{}
)

for _, ts := range req.Timeseries {
ls := ts.ToLabels(&b, req.Symbols)

if !ls.Has(labels.MetricName) {
badRequestErrors = append(badRequestErrors, fmt.Errorf("missing metric name in labels"))
continue
} else if duplicateLabel, hasDuplicate := ls.HasDuplicateLabelNames(); hasDuplicate {
badRequestErrors = append(badRequestErrors, fmt.Errorf("duplicate label %q in labels", duplicateLabel))
continue
}

var rm pmetric.ResourceMetrics
// This cache should be populated by the metric 'target_info', but we're not handling it yet.
cacheEntry, ok := prw.jobInstanceCache[ls.Get("instance")+ls.Get("job")]
if ok {
rm = pmetric.NewResourceMetrics()
cacheEntry.CopyTo(rm)
} else {
// A remote-write request can have multiple timeseries with the same instance and job labels.
// While they are different timeseries in Prometheus, we're handling it as the same OTLP metric
// until we support 'target_info'.
rm = otelMetrics.ResourceMetrics().AppendEmpty()
parseJobAndInstance(rm.Resource().Attributes(), ls.Get("instance"), ls.Get("job"))
prw.jobInstanceCache[ls.Get("instance")+ls.Get("job")] = rm
}

switch ts.Metadata.Type {
case writev2.Metadata_METRIC_TYPE_COUNTER:
addCounterDatapoints(rm, ls, ts)
case writev2.Metadata_METRIC_TYPE_GAUGE:
addGaugeDatapoints(rm, ls, ts)
case writev2.Metadata_METRIC_TYPE_SUMMARY:
addSummaryDatapoints(rm, ls, ts)
case writev2.Metadata_METRIC_TYPE_HISTOGRAM:
addHistogramDatapoints(rm, ls, ts)
default:
badRequestErrors = append(badRequestErrors, fmt.Errorf("unsupported metric type %q for metric %q", ts.Metadata.Type, ls.Get(labels.MetricName)))
}
}

return otelMetrics, stats, errors.Join(badRequestErrors...)
}

// parseJobAndInstance turns the job and instance labels service resource attributes.
// Following the specification at https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/
func parseJobAndInstance(dest pcommon.Map, instance, job string) {
if job != "" {
dest.PutStr("service.namespace", job)
}
if instance != "" {
parts := strings.Split(instance, "/")
if len(parts) == 2 {
dest.PutStr("service.name", parts[0])
dest.PutStr("service.instance.id", parts[1])
return
}
dest.PutStr("service.name", instance)
}
}

func addCounterDatapoints(_ pmetric.ResourceMetrics, _ labels.Labels, _ writev2.TimeSeries) {
// TODO: Implement this function
}

func addGaugeDatapoints(rm pmetric.ResourceMetrics, ls labels.Labels, ts writev2.TimeSeries) {
m := rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptyGauge()
addDatapoints(m.DataPoints(), ls, ts)
}

func addSummaryDatapoints(_ pmetric.ResourceMetrics, _ labels.Labels, _ writev2.TimeSeries) {
// TODO: Implement this function
}

func addHistogramDatapoints(_ pmetric.ResourceMetrics, _ labels.Labels, _ writev2.TimeSeries) {
// TODO: Implement this function
}

// addDatapoints adds the labels to the datapoints attributes.
// TODO: We're still not handling several fields that make a datapoint complete, e.g. StartTimestamp,
// Timestamp, Value, etc.
func addDatapoints(datapoints pmetric.NumberDataPointSlice, ls labels.Labels, _ writev2.TimeSeries) {
attributes := datapoints.AppendEmpty().Attributes()

for _, l := range ls {
if l.Name == "instance" || l.Name == "job" || // Become resource attributes "service.name", "service.instance.id" and "service.namespace"
l.Name == labels.MetricName || // Becomes metric name
l.Name == "otel_scope_name" || l.Name == "otel_scope_version" { // Becomes scope name and version
continue
}
attributes.PutStr(l.Name, l.Value)
}
}
115 changes: 114 additions & 1 deletion receiver/prometheusremotewritereceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,40 @@ import (
"github.com/golang/snappy"
promconfig "github.com/prometheus/prometheus/config"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/storage/remote"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver/receivertest"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
)

func setupServer(t *testing.T) {
var (
writeV2RequestFixture = &writev2.Request{
Symbols: []string{"", "__name__", "test_metric1", "job", "test", "instance", "service-x/107cn001", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"},
Timeseries: []writev2.TimeSeries{
{
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE},
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Symbolized writeRequestFixture.Timeseries[0].Labels
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
},
{
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE},
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Same series as first. Should use the same resource metrics.
Samples: []writev2.Sample{{Value: 2, Timestamp: 2}},
},
{
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE},
LabelsRefs: []uint32{1, 2, 3, 9, 5, 10, 7, 8, 9, 10}, // This series has different label values for job and instance.
Samples: []writev2.Sample{{Value: 2, Timestamp: 2}},
},
},
}
)

func setupMetricsReceiver(t *testing.T) *prometheusRemoteWriteReceiver {
t.Helper()

factory := NewFactory()
Expand All @@ -30,6 +57,13 @@ func setupServer(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, prwReceiver, "metrics receiver creation failed")

return prwReceiver.(*prometheusRemoteWriteReceiver)
}

func setupServer(t *testing.T) {
t.Helper()

prwReceiver := setupMetricsReceiver(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

Expand Down Expand Up @@ -98,3 +132,82 @@ func TestHandlePRWContentTypeNegotiation(t *testing.T) {
})
}
}

func TestTranslateV2(t *testing.T) {
prwReceiver := setupMetricsReceiver(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

for _, tc := range []struct {
name string
request *writev2.Request
expectError string
expectedMetrics pmetric.Metrics
expectedStats remote.WriteResponseStats
}{
{
name: "missing metric name",
request: &writev2.Request{
Symbols: []string{"", "foo", "bar"},
Timeseries: []writev2.TimeSeries{
{
LabelsRefs: []uint32{1, 2},
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
},
},
},
expectError: "missing metric name in labels",
},
{
name: "duplicate label",
request: &writev2.Request{
Symbols: []string{"", "__name__", "test"},
Timeseries: []writev2.TimeSeries{
{
LabelsRefs: []uint32{1, 2, 1, 2},
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
},
},
},
expectError: `duplicate label "__name__" in labels`,
},
{
name: "valid request",
request: writeV2RequestFixture,
expectedMetrics: func() pmetric.Metrics {
expected := pmetric.NewMetrics()
rm1 := expected.ResourceMetrics().AppendEmpty()
rmAttributes1 := rm1.Resource().Attributes()
rmAttributes1.PutStr("service.namespace", "test")
rmAttributes1.PutStr("service.name", "service-x")
rmAttributes1.PutStr("service.instance.id", "107cn001")
mAttributes1 := rm1.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes()
mAttributes1.PutStr("d", "e")
mAttributes1.PutStr("foo", "bar")

rm2 := expected.ResourceMetrics().AppendEmpty()
rmAttributes2 := rm2.Resource().Attributes()
rmAttributes2.PutStr("service.namespace", "foo")
rmAttributes2.PutStr("service.name", "bar")
mAttributes2 := rm2.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes()
mAttributes2.PutStr("d", "e")
mAttributes2.PutStr("foo", "bar")

return expected
}(),
expectedStats: remote.WriteResponseStats{},
},
} {
t.Run(tc.name, func(t *testing.T) {
metrics, stats, err := prwReceiver.translateV2(ctx, tc.request)
if tc.expectError != "" {
assert.ErrorContains(t, err, tc.expectError)
return
}

assert.NoError(t, err)
assert.NoError(t, pmetrictest.CompareMetrics(tc.expectedMetrics, metrics))
assert.Equal(t, tc.expectedStats, stats)
})
}
}

0 comments on commit f862fe3

Please sign in to comment.