Skip to content

Commit

Permalink
backport spawn otel receivers on need (#6898)
Browse files Browse the repository at this point in the history
* backport spawn otel receivers on need

* remove default value for topic
  • Loading branch information
wildum authored May 15, 2024
1 parent 93b2ebf commit 52b31ac
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 29 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ Main (unreleased)

- Fix a bug with the logs pipeline in static mode which prevented it from shutting down cleanly.

- Fix a bug where a topic was claimed by the wrong consumer type in `otelcol.receiver.kafka`. (@wildum)

### Other changes

- Clustering for Grafana Agent in Flow mode has graduated from beta to stable.
Expand Down
12 changes: 11 additions & 1 deletion docs/sources/flow/reference/components/otelcol.receiver.kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,23 @@ Name | Type | Description | Default | Required
---- | ---- | ----------- | ------- | --------
`brokers` | `array(string)` | Kafka brokers to connect to. | | yes
`protocol_version` | `string` | Kafka protocol version to use. | | yes
`topic` | `string` | Kafka topic to read from. | `"otlp_spans"` | no
`topic` | `string` | Kafka topic to read from. | | no
`encoding` | `string` | Encoding of payload read from Kafka. | `"otlp_proto"` | no
`group_id` | `string` | Consumer group to consume messages from. | `"otel-collector"` | no
`client_id` | `string` | Consumer client ID to use. | `"otel-collector"` | no
`initial_offset` | `string` | Initial offset to use if no offset was previously committed. | `"latest"` | no
`resolve_canonical_bootstrap_servers_only` | `bool` | Whether to resolve then reverse-lookup broker IPs during startup. | `"false"` | no

If `topic` is not set, different topics will be used for different telemetry signals:

* Metrics will be received from an `otlp_metrics` topic.
* Traces will be received from an `otlp_spans` topic.
* Logs will be received from an `otlp_logs` topic.

If `topic` is set to a specific value, then only the signal type that corresponds to the data stored in the topic must be set in the output block.
For example, if `topic` is set to `"my_telemetry"`, then the `"my_telemetry"` topic can only contain either metrics, logs, or traces.
If it contains only metrics, then `otelcol.receiver.kafka` should be configured to output only metrics.

The `encoding` argument determines how to decode messages read from Kafka.
`encoding` must be one of the following strings:

Expand Down
24 changes: 23 additions & 1 deletion internal/component/otelcol/receiver/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
package kafka

import (
"fmt"
"strings"
"time"

"github.com/grafana/agent/internal/component"
Expand Down Expand Up @@ -63,7 +65,6 @@ func (args *Arguments) SetToDefault() {
// for compatibility, even though that means using a client and group ID of
// "otel-collector".

Topic: "otlp_spans",
Encoding: "otlp_proto",
Brokers: []string{"localhost:9092"},
ClientID: "otel-collector",
Expand All @@ -77,6 +78,27 @@ func (args *Arguments) SetToDefault() {
args.DebugMetrics.SetToDefault()
}

// Validate implements syntax.Validator.
func (args *Arguments) Validate() error {
var signals []string

if len(args.Topic) > 0 {
if len(args.Output.Logs) > 0 {
signals = append(signals, "logs")
}
if len(args.Output.Metrics) > 0 {
signals = append(signals, "metrics")
}
if len(args.Output.Traces) > 0 {
signals = append(signals, "traces")
}
if len(signals) > 1 {
return fmt.Errorf("only one signal can be set in the output block when a Kafka topic is explicitly set; currently set signals: %s", strings.Join(signals, ", "))
}
}
return nil
}

// Convert implements receiver.Arguments.
func (args Arguments) Convert() (otelcomponent.Config, error) {
input := make(map[string]interface{})
Expand Down
31 changes: 26 additions & 5 deletions internal/component/otelcol/receiver/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/grafana/agent/internal/component/otelcol"
"github.com/grafana/agent/internal/component/otelcol/internal/fakeconsumer"
"github.com/grafana/agent/internal/component/otelcol/receiver/kafka"
"github.com/grafana/river"
"github.com/mitchellh/mapstructure"
Expand All @@ -29,7 +30,6 @@ func TestArguments_UnmarshalRiver(t *testing.T) {
expected: kafkareceiver.Config{
Brokers: []string{"10.10.10.10:9092"},
ProtocolVersion: "2.0.0",
Topic: "otlp_spans",
Encoding: "otlp_proto",
GroupID: "otel-collector",
ClientID: "otel-collector",
Expand Down Expand Up @@ -153,7 +153,6 @@ func TestArguments_Auth(t *testing.T) {
expected: map[string]interface{}{
"brokers": []string{"10.10.10.10:9092"},
"protocol_version": "2.0.0",
"topic": "otlp_spans",
"encoding": "otlp_proto",
"group_id": "otel-collector",
"client_id": "otel-collector",
Expand Down Expand Up @@ -205,7 +204,6 @@ func TestArguments_Auth(t *testing.T) {
expected: map[string]interface{}{
"brokers": []string{"10.10.10.10:9092"},
"protocol_version": "2.0.0",
"topic": "otlp_spans",
"encoding": "otlp_proto",
"group_id": "otel-collector",
"client_id": "otel-collector",
Expand Down Expand Up @@ -263,7 +261,6 @@ func TestArguments_Auth(t *testing.T) {
expected: map[string]interface{}{
"brokers": []string{"10.10.10.10:9092"},
"protocol_version": "2.0.0",
"topic": "otlp_spans",
"encoding": "otlp_proto",
"group_id": "otel-collector",
"client_id": "otel-collector",
Expand Down Expand Up @@ -320,7 +317,6 @@ func TestArguments_Auth(t *testing.T) {
expected: map[string]interface{}{
"brokers": []string{"10.10.10.10:9092"},
"protocol_version": "2.0.0",
"topic": "otlp_spans",
"encoding": "otlp_proto",
"group_id": "otel-collector",
"client_id": "otel-collector",
Expand Down Expand Up @@ -433,3 +429,28 @@ func TestDebugMetricsConfig(t *testing.T) {
})
}
}

func TestArguments_Validate(t *testing.T) {
cfg := `
brokers = ["10.10.10.10:9092"]
protocol_version = "2.0.0"
topic = "traces"
output {
}
`
var args kafka.Arguments
require.NoError(t, river.Unmarshal([]byte(cfg), &args))

// Adding two traces consumer, expect no error
args.Output.Traces = append(args.Output.Traces, &fakeconsumer.Consumer{})
args.Output.Traces = append(args.Output.Traces, &fakeconsumer.Consumer{})
require.NoError(t, args.Validate())

// Adding another signal type
args.Output.Logs = append(args.Output.Logs, &fakeconsumer.Consumer{})
require.ErrorContains(t, args.Validate(), "only one signal can be set in the output block when a Kafka topic is explicitly set; currently set signals: logs, traces")

// Adding another signal type
args.Output.Metrics = append(args.Output.Metrics, &fakeconsumer.Consumer{})
require.ErrorContains(t, args.Validate(), "only one signal can be set in the output block when a Kafka topic is explicitly set; currently set signals: logs, metrics, traces")
}
46 changes: 25 additions & 21 deletions internal/component/otelcol/receiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,36 +149,40 @@ func (r *Receiver) Update(args component.Arguments) error {
return err
}

var (
next = rargs.NextConsumers()
nextTraces = fanoutconsumer.Traces(next.Traces)
nextMetrics = fanoutconsumer.Metrics(next.Metrics)
nextLogs = fanoutconsumer.Logs(next.Logs)
)
next := rargs.NextConsumers()

// Create instances of the receiver from our factory for each of our
// supported telemetry signals.
var components []otelcomponent.Component

tracesReceiver, err := r.factory.CreateTracesReceiver(r.ctx, settings, receiverConfig, nextTraces)
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
return err
} else if tracesReceiver != nil {
components = append(components, tracesReceiver)
if len(next.Traces) > 0 {
nextTraces := fanoutconsumer.Traces(next.Traces)
tracesReceiver, err := r.factory.CreateTracesReceiver(r.ctx, settings, receiverConfig, nextTraces)
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
return err
} else if tracesReceiver != nil {
components = append(components, tracesReceiver)
}
}

metricsReceiver, err := r.factory.CreateMetricsReceiver(r.ctx, settings, receiverConfig, nextMetrics)
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
return err
} else if metricsReceiver != nil {
components = append(components, metricsReceiver)
if len(next.Metrics) > 0 {
nextMetrics := fanoutconsumer.Metrics(next.Metrics)
metricsReceiver, err := r.factory.CreateMetricsReceiver(r.ctx, settings, receiverConfig, nextMetrics)
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
return err
} else if metricsReceiver != nil {
components = append(components, metricsReceiver)
}
}

logsReceiver, err := r.factory.CreateLogsReceiver(r.ctx, settings, receiverConfig, nextLogs)
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
return err
} else if logsReceiver != nil {
components = append(components, logsReceiver)
if len(next.Logs) > 0 {
nextLogs := fanoutconsumer.Logs(next.Logs)
logsReceiver, err := r.factory.CreateLogsReceiver(r.ctx, settings, receiverConfig, nextLogs)
if err != nil && !errors.Is(err, otelcomponent.ErrDataTypeIsNotSupported) {
return err
} else if logsReceiver != nil {
components = append(components, logsReceiver)
}
}

// Schedule the components to run once our component is running.
Expand Down
68 changes: 68 additions & 0 deletions internal/component/otelcol/receiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,74 @@ func TestReceiver(t *testing.T) {
require.NoError(t, waitTracesTrigger.Wait(time.Second), "consumer did not get invoked")
}

func TestReceiverNotStarted(t *testing.T) {
var (
waitConsumerTrigger = util.NewWaitTrigger()
onTracesConsumer = func(t otelconsumer.Traces) {
waitConsumerTrigger.Trigger()
}
)
te := newTestEnvironment(t, onTracesConsumer)
te.Start(fakeReceiverArgs{
Output: &otelcol.ConsumerArguments{},
})

// Check that no trace receiver was started because it's not needed by the output.
require.ErrorContains(t, waitConsumerTrigger.Wait(time.Second), "context deadline exceeded")
}

func TestReceiverUpdate(t *testing.T) {
var (
consumer otelconsumer.Traces

waitConsumerTrigger = util.NewWaitTrigger()
onTracesConsumer = func(t otelconsumer.Traces) {
consumer = t
waitConsumerTrigger.Trigger()
}

waitTracesTrigger = util.NewWaitTrigger()
nextConsumer = &fakeconsumer.Consumer{
ConsumeTracesFunc: func(context.Context, ptrace.Traces) error {
waitTracesTrigger.Trigger()
return nil
},
}
)

te := newTestEnvironment(t, onTracesConsumer)
te.Start(fakeReceiverArgs{
Output: &otelcol.ConsumerArguments{},
})

// Check that no trace receiver was started because it's not needed by the output.
require.ErrorContains(t, waitConsumerTrigger.Wait(time.Second), "context deadline exceeded")

te.Controller.Update(fakeReceiverArgs{
Output: &otelcol.ConsumerArguments{
Traces: []otelcol.Consumer{nextConsumer},
},
})

// Now the trace receiver is started.
require.NoError(t, waitConsumerTrigger.Wait(time.Second), "no traces consumer sent")

err := consumer.ConsumeTraces(context.Background(), ptrace.NewTraces())
require.NoError(t, err)

require.NoError(t, waitTracesTrigger.Wait(time.Second), "consumer did not get invoked")

waitConsumerTrigger = util.NewWaitTrigger()

// Remove the trace receiver.
te.Controller.Update(fakeReceiverArgs{
Output: &otelcol.ConsumerArguments{},
})

// Check that after the update no trace receiver is started.
require.ErrorContains(t, waitConsumerTrigger.Wait(time.Second), "context deadline exceeded")
}

type testEnvironment struct {
t *testing.T

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
otelcol.receiver.kafka "default" {
brokers = ["broker:9092"]
protocol_version = "2.0.0"
topic = ""

authentication {
plaintext {
Expand Down

0 comments on commit 52b31ac

Please sign in to comment.