diff --git a/.chloggen/awsfirehosereceiver_otlp_support.yaml b/.chloggen/awsfirehosereceiver_otlp_support.yaml new file mode 100644 index 000000000000..44d764934dbd --- /dev/null +++ b/.chloggen/awsfirehosereceiver_otlp_support.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awsfirehosereceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: added OTLP v1 support to Firehose receiver + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [34982] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/awsfirehosereceiver/README.md b/receiver/awsfirehosereceiver/README.md index a8e6d242775d..6fab04fd95bc 100644 --- a/receiver/awsfirehosereceiver/README.md +++ b/receiver/awsfirehosereceiver/README.md @@ -83,3 +83,6 @@ For example: } ``` +### otlp_v1 +The OTLP v1 format as produced by CloudWatch metric streams. +See [documentation](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-formats-opentelemetry-100.html) for details. diff --git a/receiver/awsfirehosereceiver/generated_component_test.go b/receiver/awsfirehosereceiver/generated_component_test.go index 54a22c25a689..64626eb54322 100644 --- a/receiver/awsfirehosereceiver/generated_component_test.go +++ b/receiver/awsfirehosereceiver/generated_component_test.go @@ -34,7 +34,7 @@ func TestComponentLifecycle(t *testing.T) { { name: "logs", createFn: func(ctx context.Context, set receiver.Settings, cfg component.Config) (component.Component, error) { - return factory.CreateLogsReceiver(ctx, set, cfg, consumertest.NewNop()) + return factory.CreateLogs(ctx, set, cfg, consumertest.NewNop()) }, }, diff --git a/receiver/awsfirehosereceiver/go.mod b/receiver/awsfirehosereceiver/go.mod index eaaa109ca040..11b4cfc865a8 100644 --- a/receiver/awsfirehosereceiver/go.mod +++ b/receiver/awsfirehosereceiver/go.mod @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfir go 1.22.0 require ( + github.com/gogo/protobuf v1.3.2 github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.111.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.111.1-0.20241008154146-ea48c09c31ae @@ -27,7 +28,6 @@ require ( github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-viper/mapstructure/v2 v2.1.0 // indirect - github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/go-version v1.7.0 // indirect diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler.go new file mode 100644 index 000000000000..af10aedc0c78 --- /dev/null +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler.go @@ -0,0 +1,73 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otlpmetricstream // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream" + +import ( + "errors" + + "github.com/gogo/protobuf/proto" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler" +) + +const ( + // Supported version depends on version of go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp dependency + TypeStr = "otlp_v1" +) + +var ( + errInvalidOTLPFormatStart = errors.New("unable to decode data length from message") +) + +// Unmarshaler for the CloudWatch Metric Stream OpenTelemetry record format. +// +// More details can be found at: +// https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-formats-opentelemetry-100.html +type Unmarshaler struct { + logger *zap.Logger +} + +var _ unmarshaler.MetricsUnmarshaler = (*Unmarshaler)(nil) + +// NewUnmarshaler creates a new instance of the Unmarshaler. +func NewUnmarshaler(logger *zap.Logger) *Unmarshaler { + return &Unmarshaler{logger} +} + +// Unmarshal deserializes the records into pmetric.Metrics +func (u Unmarshaler) Unmarshal(records [][]byte) (pmetric.Metrics, error) { + md := pmetric.NewMetrics() + for recordIndex, record := range records { + var dataLen, pos = len(record), 0 + for pos < dataLen { + n, nLen := proto.DecodeVarint(record) + if nLen == 0 && n == 0 { + return md, errInvalidOTLPFormatStart + } + req := pmetricotlp.NewExportRequest() + pos += nLen + err := req.UnmarshalProto(record[pos : pos+int(n)]) + pos += int(n) + if err != nil { + u.logger.Error( + "Unable to unmarshal input", + zap.Error(err), + zap.Int("record_index", recordIndex), + ) + continue + } + req.Metrics().ResourceMetrics().MoveAndAppendTo(md.ResourceMetrics()) + } + } + + return md, nil +} + +// Type of the serialized messages. +func (u Unmarshaler) Type() string { + return TypeStr +} diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler_test.go new file mode 100644 index 000000000000..a35361d6bf64 --- /dev/null +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler_test.go @@ -0,0 +1,126 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package otlpmetricstream + +import ( + "testing" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "go.uber.org/zap" +) + +func TestType(t *testing.T) { + unmarshaler := NewUnmarshaler(zap.NewNop()) + require.Equal(t, TypeStr, unmarshaler.Type()) +} + +func createMetricRecord() []byte { + var er = pmetricotlp.NewExportRequest() + var rsm = er.Metrics().ResourceMetrics().AppendEmpty() + var sm = rsm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + sm.SetName("TestMetric") + var dp = sm.SetEmptySummary().DataPoints().AppendEmpty() + dp.SetCount(1) + dp.SetSum(1) + qv := dp.QuantileValues() + min := qv.AppendEmpty() + min.SetQuantile(0) + min.SetValue(0) + max := qv.AppendEmpty() + max.SetQuantile(1) + max.SetValue(1) + dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + + temp, _ := er.MarshalProto() + var record = proto.EncodeVarint(uint64(len(temp))) + record = append(record, temp...) + return record +} + +func TestUnmarshal(t *testing.T) { + unmarshaler := NewUnmarshaler(zap.NewNop()) + testCases := map[string]struct { + records [][]byte + wantResourceCount int + wantMetricCount int + wantDatapointCount int + wantErr error + }{ + "WithSingleRecord": { + records: [][]byte{ + createMetricRecord(), + }, + wantResourceCount: 1, + wantMetricCount: 1, + wantDatapointCount: 1, + }, + "WithMultipleRecords": { + records: [][]byte{ + createMetricRecord(), + createMetricRecord(), + createMetricRecord(), + createMetricRecord(), + createMetricRecord(), + createMetricRecord(), + }, + wantResourceCount: 6, + wantMetricCount: 6, + wantDatapointCount: 6, + }, + "WithEmptyRecord": { + records: make([][]byte, 0), + wantResourceCount: 0, + wantMetricCount: 0, + wantDatapointCount: 0, + }, + "WithInvalidRecords": { + records: [][]byte{{1, 2}}, + wantResourceCount: 0, + wantMetricCount: 0, + wantDatapointCount: 0, + }, + "WithSomeInvalidRecords": { + records: [][]byte{ + createMetricRecord(), + {1, 2}, + createMetricRecord(), + }, + wantResourceCount: 2, + wantMetricCount: 2, + wantDatapointCount: 2, + }, + } + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + + got, err := unmarshaler.Unmarshal(testCase.records) + if testCase.wantErr != nil { + require.Error(t, err) + require.Equal(t, testCase.wantErr, err) + } else { + require.NoError(t, err) + require.NotNil(t, got) + require.Equal(t, testCase.wantResourceCount, got.ResourceMetrics().Len()) + gotMetricCount := 0 + gotDatapointCount := 0 + for i := 0; i < got.ResourceMetrics().Len(); i++ { + rm := got.ResourceMetrics().At(i) + require.Equal(t, 1, rm.ScopeMetrics().Len()) + ilm := rm.ScopeMetrics().At(0) + gotMetricCount += ilm.Metrics().Len() + for j := 0; j < ilm.Metrics().Len(); j++ { + metric := ilm.Metrics().At(j) + gotDatapointCount += metric.Summary().DataPoints().Len() + } + } + require.Equal(t, testCase.wantMetricCount, gotMetricCount) + require.Equal(t, testCase.wantDatapointCount, gotDatapointCount) + } + }) + } +}