diff --git a/internal/converter/internal/otelcolconvert/converter_servicegraphconnector.go b/internal/converter/internal/otelcolconvert/converter_servicegraphconnector.go new file mode 100644 index 000000000000..1211e6184347 --- /dev/null +++ b/internal/converter/internal/otelcolconvert/converter_servicegraphconnector.go @@ -0,0 +1,67 @@ +package otelcolconvert + +import ( + "fmt" + + "github.com/grafana/agent/internal/component/otelcol" + "github.com/grafana/agent/internal/component/otelcol/connector/servicegraph" + "github.com/grafana/agent/internal/converter/diag" + "github.com/grafana/agent/internal/converter/internal/common" + "github.com/open-telemetry/opentelemetry-collector-contrib/connector/servicegraphconnector" + "go.opentelemetry.io/collector/component" +) + +func init() { + converters = append(converters, servicegraphConnectorConverter{}) +} + +type servicegraphConnectorConverter struct{} + +func (servicegraphConnectorConverter) Factory() component.Factory { + return servicegraphconnector.NewFactory() +} + +func (servicegraphConnectorConverter) InputComponentName() string { + return "otelcol.connector.servicegraph" +} + +func (servicegraphConnectorConverter) ConvertAndAppend(state *State, id component.InstanceID, cfg component.Config) diag.Diagnostics { + var diags diag.Diagnostics + + label := state.FlowComponentLabel() + + args := toServicegraphConnector(state, id, cfg.(*servicegraphconnector.Config)) + block := common.NewBlockWithOverride([]string{"otelcol", "connector", "servicegraph"}, label, args) + + diags.Add( + diag.SeverityLevelInfo, + fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(block)), + ) + + state.Body().AppendBlock(block) + return diags +} + +func toServicegraphConnector(state *State, id component.InstanceID, cfg *servicegraphconnector.Config) *servicegraph.Arguments { + if cfg == nil { + return nil + } + var ( + nextMetrics = state.Next(id, component.DataTypeMetrics) + ) + + return &servicegraph.Arguments{ + LatencyHistogramBuckets: cfg.LatencyHistogramBuckets, + Dimensions: cfg.Dimensions, + Store: servicegraph.StoreConfig{ + MaxItems: cfg.Store.MaxItems, + TTL: cfg.Store.TTL, + }, + CacheLoop: cfg.CacheLoop, + StoreExpirationLoop: cfg.StoreExpirationLoop, + MetricsFlushInterval: cfg.MetricsFlushInterval, + Output: &otelcol.ConsumerArguments{ + Metrics: ToTokenizedConsumers(nextMetrics), + }, + } +} diff --git a/internal/converter/internal/otelcolconvert/pipeline_group.go b/internal/converter/internal/otelcolconvert/pipeline_group.go index 3c6f278aad38..b1b185fecbe3 100644 --- a/internal/converter/internal/otelcolconvert/pipeline_group.go +++ b/internal/converter/internal/otelcolconvert/pipeline_group.go @@ -175,6 +175,11 @@ func (group pipelineGroup) NextTraces(fromID component.InstanceID) []component.I func nextInPipeline(pipeline *pipelines.PipelineConfig, fromID component.InstanceID) []component.InstanceID { switch fromID.Kind { case component.KindReceiver, component.KindConnector: + // Validate this receiver is part of the pipeline. + if !findInComponentIds(fromID, pipeline.Receivers) { + return nil + } + // Receivers and connectors should either send to the first processor // if one exists or to every exporter otherwise. if len(pipeline.Processors) > 0 { @@ -183,6 +188,11 @@ func nextInPipeline(pipeline *pipelines.PipelineConfig, fromID component.Instanc return toComponentInstanceIDs(component.KindExporter, pipeline.Exporters) case component.KindProcessor: + // Validate this processor is part of the pipeline. + if !findInComponentIds(fromID, pipeline.Processors) { + return nil + } + // Processors should send to the next processor if one exists or to every // exporter otherwise. processorIndex := slices.Index(pipeline.Processors, fromID.ID) @@ -217,3 +227,12 @@ func toComponentInstanceIDs(kind component.Kind, ids []component.ID) []component return res } + +func findInComponentIds(fromID component.InstanceID, componentIDs []component.ID) bool { + for _, id := range componentIDs { + if fromID.ID == id { + return true + } + } + return false +} diff --git a/internal/converter/internal/otelcolconvert/testdata/inconsistent_processor.river b/internal/converter/internal/otelcolconvert/testdata/inconsistent_processor.river index 141cfb77953f..199156c6ac43 100644 --- a/internal/converter/internal/otelcolconvert/testdata/inconsistent_processor.river +++ b/internal/converter/internal/otelcolconvert/testdata/inconsistent_processor.river @@ -12,9 +12,9 @@ otelcol.receiver.otlp "default" { otelcol.processor.batch "default" { output { - metrics = [otelcol.exporter.otlp.default.input] + metrics = [] logs = [otelcol.exporter.otlp.default.input] - traces = [otelcol.exporter.otlp.default.input] + traces = [] } } diff --git a/internal/converter/internal/otelcolconvert/testdata/servicegraph.river b/internal/converter/internal/otelcolconvert/testdata/servicegraph.river new file mode 100644 index 000000000000..96aeccfaff97 --- /dev/null +++ b/internal/converter/internal/otelcolconvert/testdata/servicegraph.river @@ -0,0 +1,37 @@ +otelcol.receiver.otlp "default" { + grpc { } + + http { } + + output { + metrics = [] + logs = [] + traces = [otelcol.connector.servicegraph.default.input] + } +} + +otelcol.exporter.otlp "default" { + sending_queue { + queue_size = 5000 + } + + client { + endpoint = "database:4317" + } +} + +otelcol.connector.servicegraph "default" { + latency_histogram_buckets = ["100ms", "250ms", "1s", "5s", "10s"] + dimensions = ["dimension-1", "dimension-2"] + + store { + max_items = 10 + ttl = "1s" + } + cache_loop = "2m0s" + store_expiration_loop = "5s" + + output { + metrics = [otelcol.exporter.otlp.default.input] + } +} diff --git a/internal/converter/internal/otelcolconvert/testdata/servicegraph.yaml b/internal/converter/internal/otelcolconvert/testdata/servicegraph.yaml new file mode 100644 index 000000000000..f033a9022d6f --- /dev/null +++ b/internal/converter/internal/otelcolconvert/testdata/servicegraph.yaml @@ -0,0 +1,35 @@ +receivers: + otlp: + protocols: + grpc: + http: + +connectors: + servicegraph: + latency_histogram_buckets: [100ms, 250ms, 1s, 5s, 10s] + dimensions: + - dimension-1 + - dimension-2 + store: + ttl: 1s + max_items: 10 + cache_loop: 2m + store_expiration_loop: 5s + +exporters: + otlp: + # Our defaults have drifted from upstream, so we explicitly set our + # defaults below (balancer_name and queue_size). + endpoint: database:4317 + balancer_name: pick_first + sending_queue: + queue_size: 5000 + +service: + pipelines: + traces: + receivers: [otlp] + exporters: [servicegraph] + metrics: + receivers: [servicegraph] + exporters: [otlp] diff --git a/internal/converter/internal/otelcolconvert/testdata/spanmetrics.river b/internal/converter/internal/otelcolconvert/testdata/spanmetrics.river index 655a859f8e92..7cb422b5b096 100644 --- a/internal/converter/internal/otelcolconvert/testdata/spanmetrics.river +++ b/internal/converter/internal/otelcolconvert/testdata/spanmetrics.river @@ -4,7 +4,7 @@ otelcol.receiver.otlp "default" { http { } output { - metrics = [otelcol.exporter.otlp.default.input] + metrics = [] logs = [otelcol.exporter.otlp.default.input] traces = [otelcol.connector.spanmetrics.default.input] } diff --git a/internal/converter/internal/otelcolconvert/testdata/spanmetrics_full.river b/internal/converter/internal/otelcolconvert/testdata/spanmetrics_full.river index ad20c4553708..f2806e3d12df 100644 --- a/internal/converter/internal/otelcolconvert/testdata/spanmetrics_full.river +++ b/internal/converter/internal/otelcolconvert/testdata/spanmetrics_full.river @@ -4,7 +4,7 @@ otelcol.receiver.otlp "default_traces" { http { } output { - metrics = [otelcol.exporter.otlp.default_metrics_backend.input] + metrics = [] logs = [] traces = [otelcol.exporter.otlp.default_traces_backend.input, otelcol.connector.spanmetrics.default.input] }