From efb40d0e752807ddee976440623fde313c80becc Mon Sep 17 00:00:00 2001 From: odubajDT Date: Thu, 31 Oct 2024 11:27:31 +0100 Subject: [PATCH] [processor/resourcedetection] add support for Profiles signal type Signed-off-by: odubajDT --- .../resourcedetectionprocessor-profiles.yaml | 27 ++++++++++++++ .../resourcedetectionprocessor/factory.go | 37 ++++++++++++++++--- .../generated_component_test.go | 21 +++++++++++ processor/resourcedetectionprocessor/go.mod | 7 ++-- processor/resourcedetectionprocessor/go.sum | 2 + .../internal/metadata/generated_status.go | 1 + .../resourcedetectionprocessor/metadata.yaml | 1 + .../resourcedetection_processor.go | 13 +++++++ 8 files changed, 101 insertions(+), 8 deletions(-) create mode 100644 .chloggen/resourcedetectionprocessor-profiles.yaml diff --git a/.chloggen/resourcedetectionprocessor-profiles.yaml b/.chloggen/resourcedetectionprocessor-profiles.yaml new file mode 100644 index 000000000000..11a69f5bab0d --- /dev/null +++ b/.chloggen/resourcedetectionprocessor-profiles.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: resourcedetectionprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Introduce support for Profiles signal type." + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35980] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/resourcedetectionprocessor/factory.go b/processor/resourcedetectionprocessor/factory.go index f1ae4b11ac35..37a2e21aa019 100644 --- a/processor/resourcedetectionprocessor/factory.go +++ b/processor/resourcedetectionprocessor/factory.go @@ -12,8 +12,11 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumerprofiles" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processorhelper" + "go.opentelemetry.io/collector/processor/processorhelper/processorhelperprofiles" + "go.opentelemetry.io/collector/processor/processorprofiles" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal/aws/ec2" @@ -46,7 +49,7 @@ type factory struct { } // NewFactory creates a new factory for ResourceDetection processor. -func NewFactory() processor.Factory { +func NewFactory() processorprofiles.Factory { resourceProviderFactory := internal.NewProviderFactory(map[internal.DetectorType]internal.DetectorFactory{ aks.TypeStr: aks.NewDetector, azure.TypeStr: azure.NewDetector, @@ -70,12 +73,13 @@ func NewFactory() processor.Factory { providers: map[component.ID]*internal.ResourceProvider{}, } - return processor.NewFactory( + return processorprofiles.NewFactory( metadata.Type, createDefaultConfig, - processor.WithTraces(f.createTracesProcessor, metadata.TracesStability), - processor.WithMetrics(f.createMetricsProcessor, metadata.MetricsStability), - processor.WithLogs(f.createLogsProcessor, metadata.LogsStability)) + processorprofiles.WithTraces(f.createTracesProcessor, metadata.TracesStability), + processorprofiles.WithMetrics(f.createMetricsProcessor, metadata.MetricsStability), + processorprofiles.WithLogs(f.createLogsProcessor, metadata.LogsStability), + processorprofiles.WithProfiles(f.createProfilesProcessor, metadata.ProfilesStability)) } // Type gets the type of the Option config created by this factory. @@ -164,6 +168,29 @@ func (f *factory) createLogsProcessor( processorhelper.WithStart(rdp.Start)) } +func (f *factory) createProfilesProcessor( + ctx context.Context, + set processor.Settings, + cfg component.Config, + nextConsumer consumerprofiles.Profiles, +) (processorprofiles.Profiles, error) { + rdp, err := f.getResourceDetectionProcessor(set, cfg) + if err != nil { + return nil, err + } + + return processorhelperprofiles.NewProfiles( + ctx, + set, + cfg, + nextConsumer, + rdp.processProfiles, + processorhelperprofiles.WithCapabilities(consumerCapabilities), + processorhelperprofiles.WithStart(rdp.Start)) +} + +//func(context.Context, processor.Settings, component.Config, consumerprofiles.Profiles) (Profiles, error) + func (f *factory) getResourceDetectionProcessor( params processor.Settings, cfg component.Config, diff --git a/processor/resourcedetectionprocessor/generated_component_test.go b/processor/resourcedetectionprocessor/generated_component_test.go index 5dc2bcdc686a..1010de92d7c5 100644 --- a/processor/resourcedetectionprocessor/generated_component_test.go +++ b/processor/resourcedetectionprocessor/generated_component_test.go @@ -15,8 +15,10 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/processor" + "go.opentelemetry.io/collector/processor/processorprofiles" "go.opentelemetry.io/collector/processor/processortest" ) @@ -56,6 +58,13 @@ func TestComponentLifecycle(t *testing.T) { return factory.CreateTraces(ctx, set, cfg, consumertest.NewNop()) }, }, + + { + name: "profiles", + createFn: func(ctx context.Context, set processor.Settings, cfg component.Config) (component.Component, error) { + return factory.CreateProfiles(ctx, set, cfg, consumertest.NewNop()) + }, + }, } cm, err := confmaptest.LoadConf("metadata.yaml") @@ -104,6 +113,14 @@ func TestComponentLifecycle(t *testing.T) { traces.MarkReadOnly() } err = e.ConsumeTraces(context.Background(), traces) + case "profiles": + e, ok := c.(processorprofiles.Profiles) + require.True(t, ok) + profiles := generateLifecycleTestProfiles() + if !e.Capabilities().MutatesData { + profiles.MarkReadOnly() + } + err = e.ConsumeProfiles(context.Background(), profiles) } }) require.NoError(t, err) @@ -147,3 +164,7 @@ func generateLifecycleTestTraces() ptrace.Traces { span.SetEndTimestamp(pcommon.NewTimestampFromTime(time.Now())) return traces } + +func generateLifecycleTestProfiles() pprofile.Profiles { + return pprofile.NewProfiles() +} diff --git a/processor/resourcedetectionprocessor/go.mod b/processor/resourcedetectionprocessor/go.mod index 703d2f2ae51b..857d29c1a85d 100644 --- a/processor/resourcedetectionprocessor/go.mod +++ b/processor/resourcedetectionprocessor/go.mod @@ -20,10 +20,14 @@ require ( go.opentelemetry.io/collector/config/configtls v1.18.0 go.opentelemetry.io/collector/confmap v1.18.0 go.opentelemetry.io/collector/consumer v0.112.0 + go.opentelemetry.io/collector/consumer/consumerprofiles v0.112.0 go.opentelemetry.io/collector/consumer/consumertest v0.112.0 go.opentelemetry.io/collector/featuregate v1.18.0 go.opentelemetry.io/collector/pdata v1.18.0 + go.opentelemetry.io/collector/pdata/pprofile v0.112.0 go.opentelemetry.io/collector/processor v0.112.0 + go.opentelemetry.io/collector/processor/processorhelper/processorhelperprofiles v0.0.0-20241030215746-b76b9f75b604 + go.opentelemetry.io/collector/processor/processorprofiles v0.112.0 go.opentelemetry.io/collector/processor/processortest v0.112.0 go.opentelemetry.io/collector/semconv v0.112.0 go.uber.org/goleak v1.3.0 @@ -114,13 +118,10 @@ require ( go.opentelemetry.io/collector/config/configcompression v1.18.0 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.112.0 // indirect go.opentelemetry.io/collector/config/internal v0.112.0 // indirect - go.opentelemetry.io/collector/consumer/consumerprofiles v0.112.0 // indirect go.opentelemetry.io/collector/extension v0.112.0 // indirect go.opentelemetry.io/collector/extension/auth v0.112.0 // indirect - go.opentelemetry.io/collector/pdata/pprofile v0.112.0 // indirect go.opentelemetry.io/collector/pdata/testdata v0.112.0 // indirect go.opentelemetry.io/collector/pipeline v0.112.0 // indirect - go.opentelemetry.io/collector/processor/processorprofiles v0.112.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect go.opentelemetry.io/otel v1.31.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.30.0 // indirect diff --git a/processor/resourcedetectionprocessor/go.sum b/processor/resourcedetectionprocessor/go.sum index 2257e0c04304..9307a78d39df 100644 --- a/processor/resourcedetectionprocessor/go.sum +++ b/processor/resourcedetectionprocessor/go.sum @@ -502,6 +502,8 @@ go.opentelemetry.io/collector/pipeline v0.112.0 h1:jqKDdb8k53OLPibvxzX6fmMec0ZHA go.opentelemetry.io/collector/pipeline v0.112.0/go.mod h1:4vOvjVsoYTHVGTbfFwqfnQOSV2K3RKUHofh3jNRc2Mg= go.opentelemetry.io/collector/processor v0.112.0 h1:nMv9DOBYR9MB78ddUgY3A3ytwAwk3t4HQMNIu+w8o0g= go.opentelemetry.io/collector/processor v0.112.0/go.mod h1:AJ8EHq8Z/ev90f4gU6G5ULUncdpWmBRATYk8ioR3pvw= +go.opentelemetry.io/collector/processor/processorhelper/processorhelperprofiles v0.0.0-20241030215746-b76b9f75b604 h1:vJ4eoxhVw7pKJwYrqiNXVxMjSwcAPDNYB9kXa4ijXQs= +go.opentelemetry.io/collector/processor/processorhelper/processorhelperprofiles v0.0.0-20241030215746-b76b9f75b604/go.mod h1:L9jBHdFye2OepokuUfvWytp+xg1RAlaPP7aURJ+n4vM= go.opentelemetry.io/collector/processor/processorprofiles v0.112.0 h1:Aef68SAbmBbhbsZZPuZb0ECwkV05vIcHIizGOGbWsbM= go.opentelemetry.io/collector/processor/processorprofiles v0.112.0/go.mod h1:OUS7GcPCvFAIERSUFJLMtj6MSUOTCuS2pGKB7B+OHXs= go.opentelemetry.io/collector/processor/processortest v0.112.0 h1:kW7kZ6EC1YjBiOvdajxN/DxvVljr9MKMemHheoaYcFc= diff --git a/processor/resourcedetectionprocessor/internal/metadata/generated_status.go b/processor/resourcedetectionprocessor/internal/metadata/generated_status.go index ecca969e7db9..440fdb04c319 100644 --- a/processor/resourcedetectionprocessor/internal/metadata/generated_status.go +++ b/processor/resourcedetectionprocessor/internal/metadata/generated_status.go @@ -15,4 +15,5 @@ const ( TracesStability = component.StabilityLevelBeta MetricsStability = component.StabilityLevelBeta LogsStability = component.StabilityLevelBeta + ProfilesStability = component.StabilityLevelDevelopment ) diff --git a/processor/resourcedetectionprocessor/metadata.yaml b/processor/resourcedetectionprocessor/metadata.yaml index fee99b032daf..9e98d1f5cbd7 100644 --- a/processor/resourcedetectionprocessor/metadata.yaml +++ b/processor/resourcedetectionprocessor/metadata.yaml @@ -4,6 +4,7 @@ status: class: processor stability: beta: [traces, metrics, logs] + development: [profiles] distributions: [contrib, k8s] codeowners: active: [Aneurysm9, dashpole] diff --git a/processor/resourcedetectionprocessor/resourcedetection_processor.go b/processor/resourcedetectionprocessor/resourcedetection_processor.go index 44f3331e6473..40d2939ad354 100644 --- a/processor/resourcedetectionprocessor/resourcedetection_processor.go +++ b/processor/resourcedetectionprocessor/resourcedetection_processor.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/pdata/ptrace" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal" @@ -69,3 +70,15 @@ func (rdp *resourceDetectionProcessor) processLogs(_ context.Context, ld plog.Lo } return ld, nil } + +// processProfiles implements the ProcessProfilesFunc type. +func (rdp *resourceDetectionProcessor) processProfiles(_ context.Context, ld pprofile.Profiles) (pprofile.Profiles, error) { + rl := ld.ResourceProfiles() + for i := 0; i < rl.Len(); i++ { + rss := rl.At(i) + rss.SetSchemaUrl(internal.MergeSchemaURL(rss.SchemaUrl(), rdp.schemaURL)) + res := rss.Resource() + internal.MergeResource(res, rdp.resource, rdp.override) + } + return ld, nil +}