From 8d9f682e3dacf08600649f5f2f4c6c01b8feabbd Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Mon, 21 Oct 2024 22:23:23 -0700 Subject: [PATCH 1/3] [receiver/chrony] Move chronyreceiver to beta (#35913) #### Description Chronyreceiver has been in alpha for a while and seems ready to move to beta. Opening this PR for discussion. --- .chloggen/move_chronyreceiver_to_beta.yaml | 27 +++++++++++++++++++ receiver/chronyreceiver/README.md | 4 +-- .../internal/metadata/generated_status.go | 2 +- receiver/chronyreceiver/metadata.yaml | 2 +- 4 files changed, 31 insertions(+), 4 deletions(-) create mode 100644 .chloggen/move_chronyreceiver_to_beta.yaml diff --git a/.chloggen/move_chronyreceiver_to_beta.yaml b/.chloggen/move_chronyreceiver_to_beta.yaml new file mode 100644 index 000000000000..7e916b816dfb --- /dev/null +++ b/.chloggen/move_chronyreceiver_to_beta.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: chronyreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Move chronyreceiver to beta + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35913] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/chronyreceiver/README.md b/receiver/chronyreceiver/README.md index 7d1b553af819..bde66d45c467 100644 --- a/receiver/chronyreceiver/README.md +++ b/receiver/chronyreceiver/README.md @@ -3,12 +3,12 @@ | Status | | | ------------- |-----------| -| Stability | [alpha]: metrics | +| Stability | [beta]: metrics | | Distributions | [contrib] | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fchrony%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fchrony) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fchrony%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fchrony) | | [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@MovieStoreGuy](https://www.github.com/MovieStoreGuy), [@jamesmoessis](https://www.github.com/jamesmoessis) | -[alpha]: https://github.com/open-telemetry/opentelemetry-collector#alpha +[beta]: https://github.com/open-telemetry/opentelemetry-collector#beta [contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib diff --git a/receiver/chronyreceiver/internal/metadata/generated_status.go b/receiver/chronyreceiver/internal/metadata/generated_status.go index a6509c4100d7..145fbbfd0e14 100644 --- a/receiver/chronyreceiver/internal/metadata/generated_status.go +++ b/receiver/chronyreceiver/internal/metadata/generated_status.go @@ -12,5 +12,5 @@ var ( ) const ( - MetricsStability = component.StabilityLevelAlpha + MetricsStability = component.StabilityLevelBeta ) diff --git a/receiver/chronyreceiver/metadata.yaml b/receiver/chronyreceiver/metadata.yaml index 004324302111..5cb730e4d167 100644 --- a/receiver/chronyreceiver/metadata.yaml +++ b/receiver/chronyreceiver/metadata.yaml @@ -3,7 +3,7 @@ type: chrony status: class: receiver stability: - alpha: [metrics] + beta: [metrics] distributions: [contrib] codeowners: active: [MovieStoreGuy, jamesmoessis] From 58a77dbabd46a0e4be7cecfca23494036ba827b6 Mon Sep 17 00:00:00 2001 From: Dakota Paasman <122491662+dpaasman00@users.noreply.github.com> Date: Tue, 22 Oct 2024 09:11:15 -0400 Subject: [PATCH 2/3] [receiver/kafkareceiver] fix: Kafka receiver blocking shutdown (#35767) #### Description Fixes an issue where the Kafka receiver would block on shutdown. There was an earlier fix for this issue [here](https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/32720). This does solve the issue, but it was only applied to the traces receiver, not the logs or metrics receiver. The issue is this go routine in the `Start()` functions for logs and metrics: ```go go func() { if err := c.consumeLoop(ctx, metricsConsumerGroup); err != nil { componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) } }() ``` The `consumeLoop()` function returns a `context.Canceled` error when `Shutdown()` is called, which is expected. However `componentstatus.ReportStatus()` blocks while attempting to report this error. The reason/bug for this can be found [here](https://github.com/open-telemetry/opentelemetry-collector/issues/9824). The previously mentioned PR fixed this for the traces receiver by checking if the error returned by `consumeLoop()` is `context.Canceled`: ```go go func() { if err := c.consumeLoop(ctx, consumerGroup); !errors.Is(err, context.Canceled) { componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) } }() ``` Additionally, this is `consumeLoop()` for the traces receiver, with the logs and metrics versions being identical: ```go func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error { for { // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be // recreated to get the new claims if err := c.consumerGroup.Consume(ctx, c.topics, handler); err != nil { c.settings.Logger.Error("Error from consumer", zap.Error(err)) } // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err())) return ctx.Err() } } } ``` This does fix the issue, however the only error that can be returned by `consumeLoop()` is a canceled context. When we create the context and cancel function, we use `context.Background()`: ```go ctx, cancel := context.WithCancel(context.Background()) ``` This context is only used by `consumeLoop()` and the cancel function is only called in `Shutdown()`. Because `consumeLoop()` can only return a `context.Canceled` error, this PR removes this unused code for the logs, metrics, and traces receivers. Instead, `consumeLoop()` still logs the `context.Canceled` error but it does not return any error and the go routine simply just calls `consumeLoop()`. Additional motivation for removing the call to `componentstatus.ReportStatus()` is the underlying function called by it, `componentstatus.Report()` says it does not need to be called during `Shutdown()` or `Start()` as the service already does so for the given component, [comment here](https://github.com/open-telemetry/opentelemetry-collector/blob/main/component/componentstatus/status.go#L21-L25). Even if there wasn't a bug causing this call to block, the component still shouldn't call it since it would only be called during `Shutdown()`. #### Link to tracking issue Fixes #30789 #### Testing Tested in a build of the collector with these changes scraping logs from a Kafka instance. When the collector is stopped and `Shutdown()` gets called, the receiver did not block and the collector stopped gracefully as expected. --- .../fix-kafka-recv-blocking-shutdown.yaml | 27 +++++++++++ receiver/kafkareceiver/go.mod | 1 - receiver/kafkareceiver/go.sum | 2 - receiver/kafkareceiver/kafka_receiver.go | 47 ++++++++++--------- receiver/kafkareceiver/kafka_receiver_test.go | 23 ++++++--- 5 files changed, 68 insertions(+), 32 deletions(-) create mode 100644 .chloggen/fix-kafka-recv-blocking-shutdown.yaml diff --git a/.chloggen/fix-kafka-recv-blocking-shutdown.yaml b/.chloggen/fix-kafka-recv-blocking-shutdown.yaml new file mode 100644 index 000000000000..d48a75cfe954 --- /dev/null +++ b/.chloggen/fix-kafka-recv-blocking-shutdown.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkareceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fixes issue causing kafkareceiver to block during Shutdown(). + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [30789] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/kafkareceiver/go.mod b/receiver/kafkareceiver/go.mod index d14e2e8f35e1..6da4d67c9d37 100644 --- a/receiver/kafkareceiver/go.mod +++ b/receiver/kafkareceiver/go.mod @@ -17,7 +17,6 @@ require ( github.com/openzipkin/zipkin-go v0.4.3 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.111.1-0.20241008154146-ea48c09c31ae - go.opentelemetry.io/collector/component/componentstatus v0.111.1-0.20241008154146-ea48c09c31ae go.opentelemetry.io/collector/config/configtelemetry v0.111.1-0.20241008154146-ea48c09c31ae go.opentelemetry.io/collector/config/configtls v1.17.1-0.20241008154146-ea48c09c31ae go.opentelemetry.io/collector/confmap v1.17.1-0.20241008154146-ea48c09c31ae diff --git a/receiver/kafkareceiver/go.sum b/receiver/kafkareceiver/go.sum index 72c2336349c1..92dca05b53e2 100644 --- a/receiver/kafkareceiver/go.sum +++ b/receiver/kafkareceiver/go.sum @@ -126,8 +126,6 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t go.opentelemetry.io/collector v0.111.0 h1:D3LJTYrrK2ac94E2PXPSbVkArqxbklbCLsE4MAJQdRo= go.opentelemetry.io/collector/component v0.111.1-0.20241008154146-ea48c09c31ae h1:dXAMqXGJp1vWG7qwS/2sjIyJgmyOSfEOm6Gcmkzp1cQ= go.opentelemetry.io/collector/component v0.111.1-0.20241008154146-ea48c09c31ae/go.mod h1:iWUfPxpVwZhkI4v3/Gh5wt4iKyJn4lriPFAug8iLXno= -go.opentelemetry.io/collector/component/componentstatus v0.111.1-0.20241008154146-ea48c09c31ae h1:BVTz/s8fmI5UA4Q6zAndl0Pds4RrkhxEXkx9TMelleM= -go.opentelemetry.io/collector/component/componentstatus v0.111.1-0.20241008154146-ea48c09c31ae/go.mod h1:fwY2NdXkOw07ObusogFZChVyyvqXqCJBlGaWwwDJAtI= go.opentelemetry.io/collector/config/configopaque v1.17.1-0.20241008154146-ea48c09c31ae h1:Mh1ZBO6U5X8iXGFBTguBvLBydg+aLuoWX0Ij7QzHU3c= go.opentelemetry.io/collector/config/configopaque v1.17.1-0.20241008154146-ea48c09c31ae/go.mod h1:6zlLIyOoRpJJ+0bEKrlZOZon3rOp5Jrz9fMdR4twOS4= go.opentelemetry.io/collector/config/configretry v1.17.1-0.20241008154146-ea48c09c31ae h1:2iWFdlGM2sRZd2WBrivau/cBZzc5iZMgHFcheoh1xvM= diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 750955366816..48ea87559a56 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -5,14 +5,12 @@ package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collect import ( "context" - "errors" "fmt" "strconv" "sync" "github.com/IBM/sarama" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" @@ -44,6 +42,7 @@ type kafkaTracesConsumer struct { topics []string cancelConsumeLoop context.CancelFunc unmarshaler TracesUnmarshaler + consumeLoopWG *sync.WaitGroup settings receiver.Settings telemetryBuilder *metadata.TelemetryBuilder @@ -65,6 +64,7 @@ type kafkaMetricsConsumer struct { topics []string cancelConsumeLoop context.CancelFunc unmarshaler MetricsUnmarshaler + consumeLoopWG *sync.WaitGroup settings receiver.Settings telemetryBuilder *metadata.TelemetryBuilder @@ -86,6 +86,7 @@ type kafkaLogsConsumer struct { topics []string cancelConsumeLoop context.CancelFunc unmarshaler LogsUnmarshaler + consumeLoopWG *sync.WaitGroup settings receiver.Settings telemetryBuilder *metadata.TelemetryBuilder @@ -113,6 +114,7 @@ func newTracesReceiver(config Config, set receiver.Settings, nextConsumer consum config: config, topics: []string{config.Topic}, nextConsumer: nextConsumer, + consumeLoopWG: &sync.WaitGroup{}, settings: set, autocommitEnabled: config.AutoCommit.Enable, messageMarking: config.MessageMarking, @@ -207,16 +209,14 @@ func (c *kafkaTracesConsumer) Start(_ context.Context, host component.Host) erro headers: c.headers, } } - go func() { - if err := c.consumeLoop(ctx, consumerGroup); !errors.Is(err, context.Canceled) { - componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) - } - }() + c.consumeLoopWG.Add(1) + go c.consumeLoop(ctx, consumerGroup) <-consumerGroup.ready return nil } -func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error { +func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) { + defer c.consumeLoopWG.Done() for { // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be @@ -227,7 +227,7 @@ func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.Co // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err())) - return ctx.Err() + return } } } @@ -237,6 +237,7 @@ func (c *kafkaTracesConsumer) Shutdown(context.Context) error { return nil } c.cancelConsumeLoop() + c.consumeLoopWG.Wait() if c.consumerGroup == nil { return nil } @@ -253,6 +254,7 @@ func newMetricsReceiver(config Config, set receiver.Settings, nextConsumer consu config: config, topics: []string{config.Topic}, nextConsumer: nextConsumer, + consumeLoopWG: &sync.WaitGroup{}, settings: set, autocommitEnabled: config.AutoCommit.Enable, messageMarking: config.MessageMarking, @@ -315,16 +317,14 @@ func (c *kafkaMetricsConsumer) Start(_ context.Context, host component.Host) err headers: c.headers, } } - go func() { - if err := c.consumeLoop(ctx, metricsConsumerGroup); err != nil { - componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) - } - }() + c.consumeLoopWG.Add(1) + go c.consumeLoop(ctx, metricsConsumerGroup) <-metricsConsumerGroup.ready return nil } -func (c *kafkaMetricsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error { +func (c *kafkaMetricsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) { + defer c.consumeLoopWG.Done() for { // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be @@ -335,7 +335,7 @@ func (c *kafkaMetricsConsumer) consumeLoop(ctx context.Context, handler sarama.C // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err())) - return ctx.Err() + return } } } @@ -345,6 +345,7 @@ func (c *kafkaMetricsConsumer) Shutdown(context.Context) error { return nil } c.cancelConsumeLoop() + c.consumeLoopWG.Wait() if c.consumerGroup == nil { return nil } @@ -361,6 +362,7 @@ func newLogsReceiver(config Config, set receiver.Settings, nextConsumer consumer config: config, topics: []string{config.Topic}, nextConsumer: nextConsumer, + consumeLoopWG: &sync.WaitGroup{}, settings: set, autocommitEnabled: config.AutoCommit.Enable, messageMarking: config.MessageMarking, @@ -426,16 +428,14 @@ func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error headers: c.headers, } } - go func() { - if err := c.consumeLoop(ctx, logsConsumerGroup); err != nil { - componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) - } - }() + c.consumeLoopWG.Add(1) + go c.consumeLoop(ctx, logsConsumerGroup) <-logsConsumerGroup.ready return nil } -func (c *kafkaLogsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error { +func (c *kafkaLogsConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) { + defer c.consumeLoopWG.Done() for { // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be @@ -446,7 +446,7 @@ func (c *kafkaLogsConsumer) consumeLoop(ctx context.Context, handler sarama.Cons // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err())) - return ctx.Err() + return } } } @@ -456,6 +456,7 @@ func (c *kafkaLogsConsumer) Shutdown(context.Context) error { return nil } c.cancelConsumeLoop() + c.consumeLoopWG.Wait() if c.consumerGroup == nil { return nil } diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index 529196efe26e..61693d58eced 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -96,6 +96,7 @@ func TestTracesReceiverStart(t *testing.T) { c := kafkaTracesConsumer{ config: Config{Encoding: defaultEncoding}, nextConsumer: consumertest.NewNop(), + consumeLoopWG: &sync.WaitGroup{}, settings: receivertest.NewNopSettings(), consumerGroup: &testConsumerGroup{}, telemetryBuilder: nopTelemetryBuilder(t), @@ -110,6 +111,7 @@ func TestTracesReceiverStartConsume(t *testing.T) { require.NoError(t, err) c := kafkaTracesConsumer{ nextConsumer: consumertest.NewNop(), + consumeLoopWG: &sync.WaitGroup{}, settings: receivertest.NewNopSettings(), consumerGroup: &testConsumerGroup{}, telemetryBuilder: telemetryBuilder, @@ -117,11 +119,11 @@ func TestTracesReceiverStartConsume(t *testing.T) { ctx, cancelFunc := context.WithCancel(context.Background()) c.cancelConsumeLoop = cancelFunc require.NoError(t, c.Shutdown(context.Background())) - err = c.consumeLoop(ctx, &tracesConsumerGroupHandler{ + c.consumeLoopWG.Add(1) + c.consumeLoop(ctx, &tracesConsumerGroupHandler{ ready: make(chan bool), telemetryBuilder: telemetryBuilder, }) - assert.EqualError(t, err, context.Canceled.Error()) } func TestTracesReceiver_error(t *testing.T) { @@ -134,6 +136,7 @@ func TestTracesReceiver_error(t *testing.T) { c := kafkaTracesConsumer{ config: Config{Encoding: defaultEncoding}, nextConsumer: consumertest.NewNop(), + consumeLoopWG: &sync.WaitGroup{}, settings: settings, consumerGroup: &testConsumerGroup{err: expectedErr}, telemetryBuilder: nopTelemetryBuilder(t), @@ -375,6 +378,7 @@ func TestTracesReceiver_encoding_extension(t *testing.T) { c := kafkaTracesConsumer{ config: Config{Encoding: "traces_encoding"}, nextConsumer: consumertest.NewNop(), + consumeLoopWG: &sync.WaitGroup{}, settings: settings, consumerGroup: &testConsumerGroup{err: expectedErr}, telemetryBuilder: nopTelemetryBuilder(t), @@ -449,6 +453,7 @@ func TestMetricsReceiverStartConsume(t *testing.T) { require.NoError(t, err) c := kafkaMetricsConsumer{ nextConsumer: consumertest.NewNop(), + consumeLoopWG: &sync.WaitGroup{}, settings: receivertest.NewNopSettings(), consumerGroup: &testConsumerGroup{}, telemetryBuilder: telemetryBuilder, @@ -456,11 +461,11 @@ func TestMetricsReceiverStartConsume(t *testing.T) { ctx, cancelFunc := context.WithCancel(context.Background()) c.cancelConsumeLoop = cancelFunc require.NoError(t, c.Shutdown(context.Background())) - err = c.consumeLoop(ctx, &logsConsumerGroupHandler{ + c.consumeLoopWG.Add(1) + c.consumeLoop(ctx, &logsConsumerGroupHandler{ ready: make(chan bool), telemetryBuilder: telemetryBuilder, }) - assert.EqualError(t, err, context.Canceled.Error()) } func TestMetricsReceiver_error(t *testing.T) { @@ -473,6 +478,7 @@ func TestMetricsReceiver_error(t *testing.T) { c := kafkaMetricsConsumer{ config: Config{Encoding: defaultEncoding}, nextConsumer: consumertest.NewNop(), + consumeLoopWG: &sync.WaitGroup{}, settings: settings, consumerGroup: &testConsumerGroup{err: expectedErr}, telemetryBuilder: nopTelemetryBuilder(t), @@ -712,6 +718,7 @@ func TestMetricsReceiver_encoding_extension(t *testing.T) { c := kafkaMetricsConsumer{ config: Config{Encoding: "metrics_encoding"}, nextConsumer: consumertest.NewNop(), + consumeLoopWG: &sync.WaitGroup{}, settings: settings, consumerGroup: &testConsumerGroup{err: expectedErr}, telemetryBuilder: nopTelemetryBuilder(t), @@ -787,6 +794,7 @@ func TestLogsReceiverStart(t *testing.T) { c := kafkaLogsConsumer{ config: *createDefaultConfig().(*Config), nextConsumer: consumertest.NewNop(), + consumeLoopWG: &sync.WaitGroup{}, settings: receivertest.NewNopSettings(), consumerGroup: &testConsumerGroup{}, telemetryBuilder: nopTelemetryBuilder(t), @@ -801,6 +809,7 @@ func TestLogsReceiverStartConsume(t *testing.T) { require.NoError(t, err) c := kafkaLogsConsumer{ nextConsumer: consumertest.NewNop(), + consumeLoopWG: &sync.WaitGroup{}, settings: receivertest.NewNopSettings(), consumerGroup: &testConsumerGroup{}, telemetryBuilder: telemetryBuilder, @@ -808,11 +817,11 @@ func TestLogsReceiverStartConsume(t *testing.T) { ctx, cancelFunc := context.WithCancel(context.Background()) c.cancelConsumeLoop = cancelFunc require.NoError(t, c.Shutdown(context.Background())) - err = c.consumeLoop(ctx, &logsConsumerGroupHandler{ + c.consumeLoopWG.Add(1) + c.consumeLoop(ctx, &logsConsumerGroupHandler{ ready: make(chan bool), telemetryBuilder: telemetryBuilder, }) - assert.EqualError(t, err, context.Canceled.Error()) } func TestLogsReceiver_error(t *testing.T) { @@ -824,6 +833,7 @@ func TestLogsReceiver_error(t *testing.T) { expectedErr := errors.New("handler error") c := kafkaLogsConsumer{ nextConsumer: consumertest.NewNop(), + consumeLoopWG: &sync.WaitGroup{}, settings: settings, consumerGroup: &testConsumerGroup{err: expectedErr}, config: *createDefaultConfig().(*Config), @@ -1188,6 +1198,7 @@ func TestLogsReceiver_encoding_extension(t *testing.T) { c := kafkaLogsConsumer{ config: Config{Encoding: "logs_encoding"}, nextConsumer: consumertest.NewNop(), + consumeLoopWG: &sync.WaitGroup{}, settings: settings, consumerGroup: &testConsumerGroup{err: expectedErr}, telemetryBuilder: nopTelemetryBuilder(t), From e788e319e129071e74c13db48296464a5253413d Mon Sep 17 00:00:00 2001 From: Dakota Paasman <122491662+dpaasman00@users.noreply.github.com> Date: Tue, 22 Oct 2024 09:11:44 -0400 Subject: [PATCH 3/3] [extension/opamp] Add `os.description` as non-identifying agent attribute (#35816) **Description:** The opamp extension now reports additional information about the host machine's operating system, specifically the version. It does so by reporting the semantic convention `os.description`(defined [here](https://opentelemetry.io/docs/specs/semconv/attributes-registry/os/)) as a non-identifying attribute in the agent description message. **Link to tracking Issue:** Closes #35555 **Testing:** Unit tests updated. Verified `os.description` attribute shows up with values like 'macOS 15.0' and 'Ubuntu 20.04.6 LTS' --- .../opampextension-add-os-desc-semconv.yaml | 27 +++++++++++++++++++ cmd/opampsupervisor/e2e_test.go | 3 ++- extension/opampextension/go.mod | 2 +- extension/opampextension/opamp_agent.go | 23 ++++++++++++++++ extension/opampextension/opamp_agent_test.go | 5 ++++ 5 files changed, 58 insertions(+), 2 deletions(-) create mode 100644 .chloggen/opampextension-add-os-desc-semconv.yaml diff --git a/.chloggen/opampextension-add-os-desc-semconv.yaml b/.chloggen/opampextension-add-os-desc-semconv.yaml new file mode 100644 index 000000000000..af432b44136e --- /dev/null +++ b/.chloggen/opampextension-add-os-desc-semconv.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: opampextension + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Report OS description semantic convention (`os.description`) as a part of non-identifying agent description. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35555] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index 1e5cd4e36507..5b6a7ac4adbf 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -752,7 +752,8 @@ func TestSupervisorAgentDescriptionConfigApplies(t *testing.T) { }, } - require.Equal(t, expectedDescription, ad.AgentDescription) + require.Subset(t, ad.AgentDescription.IdentifyingAttributes, expectedDescription.IdentifyingAttributes) + require.Subset(t, ad.AgentDescription.NonIdentifyingAttributes, expectedDescription.NonIdentifyingAttributes) time.Sleep(250 * time.Millisecond) } diff --git a/extension/opampextension/go.mod b/extension/opampextension/go.mod index 4bc064b766df..23be0a992b71 100644 --- a/extension/opampextension/go.mod +++ b/extension/opampextension/go.mod @@ -60,7 +60,7 @@ require ( go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.28.0 // indirect golang.org/x/sys v0.25.0 // indirect - golang.org/x/text v0.17.0 // indirect + golang.org/x/text v0.17.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect google.golang.org/protobuf v1.35.1 // indirect ) diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index f984974c6c88..719e09b4d764 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -19,6 +19,7 @@ import ( "github.com/open-telemetry/opamp-go/client" "github.com/open-telemetry/opamp-go/client/types" "github.com/open-telemetry/opamp-go/protobufs" + "github.com/shirou/gopsutil/v4/host" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/confmap" @@ -27,6 +28,8 @@ import ( semconv "go.opentelemetry.io/collector/semconv/v1.27.0" "go.uber.org/zap" "golang.org/x/exp/maps" + "golang.org/x/text/cases" + "golang.org/x/text/language" "gopkg.in/yaml.v3" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages" @@ -278,6 +281,7 @@ func (o *opampAgent) createAgentDescription() error { if err != nil { return err } + description := getOSDescription(o.logger) ident := []*protobufs.KeyValue{ stringKeyValue(semconv.AttributeServiceInstanceID, o.instanceID.String()), @@ -291,6 +295,7 @@ func (o *opampAgent) createAgentDescription() error { nonIdentifyingAttributeMap[semconv.AttributeOSType] = runtime.GOOS nonIdentifyingAttributeMap[semconv.AttributeHostArch] = runtime.GOARCH nonIdentifyingAttributeMap[semconv.AttributeHostName] = hostname + nonIdentifyingAttributeMap[semconv.AttributeOSDescription] = description for k, v := range o.cfg.AgentDescription.NonIdentifyingAttributes { nonIdentifyingAttributeMap[k] = v @@ -367,3 +372,21 @@ func (o *opampAgent) setHealth(ch *protobufs.ComponentHealth) { } } } + +func getOSDescription(logger *zap.Logger) string { + info, err := host.Info() + if err != nil { + logger.Error("failed getting host info", zap.Error(err)) + return runtime.GOOS + } + switch runtime.GOOS { + case "darwin": + return "macOS " + info.PlatformVersion + case "linux": + return cases.Title(language.English).String(info.Platform) + " " + info.PlatformVersion + case "windows": + return info.Platform + " " + info.PlatformVersion + default: + return runtime.GOOS + } +} diff --git a/extension/opampextension/opamp_agent_test.go b/extension/opampextension/opamp_agent_test.go index fd72d346492c..a0f18d8f778e 100644 --- a/extension/opampextension/opamp_agent_test.go +++ b/extension/opampextension/opamp_agent_test.go @@ -19,6 +19,7 @@ import ( "go.opentelemetry.io/collector/confmap/confmaptest" "go.opentelemetry.io/collector/extension/extensiontest" semconv "go.opentelemetry.io/collector/semconv/v1.27.0" + "go.uber.org/zap" ) func TestNewOpampAgent(t *testing.T) { @@ -53,6 +54,7 @@ func TestNewOpampAgentAttributes(t *testing.T) { func TestCreateAgentDescription(t *testing.T) { hostname, err := os.Hostname() require.NoError(t, err) + description := getOSDescription(zap.NewNop()) serviceName := "otelcol-distrot" serviceVersion := "distro.0" @@ -76,6 +78,7 @@ func TestCreateAgentDescription(t *testing.T) { NonIdentifyingAttributes: []*protobufs.KeyValue{ stringKeyValue(semconv.AttributeHostArch, runtime.GOARCH), stringKeyValue(semconv.AttributeHostName, hostname), + stringKeyValue(semconv.AttributeOSDescription, description), stringKeyValue(semconv.AttributeOSType, runtime.GOOS), }, }, @@ -99,6 +102,7 @@ func TestCreateAgentDescription(t *testing.T) { stringKeyValue(semconv.AttributeHostArch, runtime.GOARCH), stringKeyValue(semconv.AttributeHostName, hostname), stringKeyValue(semconv.AttributeK8SPodName, "my-very-cool-pod"), + stringKeyValue(semconv.AttributeOSDescription, description), stringKeyValue(semconv.AttributeOSType, runtime.GOOS), }, }, @@ -119,6 +123,7 @@ func TestCreateAgentDescription(t *testing.T) { NonIdentifyingAttributes: []*protobufs.KeyValue{ stringKeyValue(semconv.AttributeHostArch, runtime.GOARCH), stringKeyValue(semconv.AttributeHostName, "override-host"), + stringKeyValue(semconv.AttributeOSDescription, description), stringKeyValue(semconv.AttributeOSType, runtime.GOOS), }, },