diff --git a/internal/converter/internal/otelcolconvert/otelcolconvert.go b/internal/converter/internal/otelcolconvert/otelcolconvert.go index d24e072b94f1..d06fc78882c2 100644 --- a/internal/converter/internal/otelcolconvert/otelcolconvert.go +++ b/internal/converter/internal/otelcolconvert/otelcolconvert.go @@ -291,28 +291,39 @@ func validateNoDuplicateReceivers(groups []pipelineGroup, connectorIDs []compone func buildConverterTable(extraConverters []ComponentConverter) map[converterKey]ComponentConverter { table := make(map[converterKey]ComponentConverter) - allConverters := append(converters, extraConverters...) + + // Ordering is critical here because conflicting converters are resolved with + // the first one in the list winning. + allConverters := append(extraConverters, converters...) for _, conv := range allConverters { fact := conv.Factory() - + var kinds []component.Kind switch fact.(type) { case receiver.Factory: - table[converterKey{Kind: component.KindReceiver, Type: fact.Type()}] = conv + kinds = append(kinds, component.KindReceiver) case processor.Factory: - table[converterKey{Kind: component.KindProcessor, Type: fact.Type()}] = conv + kinds = append(kinds, component.KindProcessor) case exporter.Factory: - table[converterKey{Kind: component.KindExporter, Type: fact.Type()}] = conv + kinds = append(kinds, component.KindExporter) case connector.Factory: - table[converterKey{Kind: component.KindConnector, Type: fact.Type()}] = conv + kinds = append(kinds, component.KindConnector) // We need this so the connector is available as a destination for state.Next - table[converterKey{Kind: component.KindExporter, Type: fact.Type()}] = conv + kinds = append(kinds, component.KindExporter) // Technically, this isn't required to be here since the entry // won't be required to look up a destination for state.Next, but // adding to reinforce the idea of how connectors are used. - table[converterKey{Kind: component.KindReceiver, Type: fact.Type()}] = conv + kinds = append(kinds, component.KindReceiver) case extension.Factory: - table[converterKey{Kind: component.KindExtension, Type: fact.Type()}] = conv + kinds = append(kinds, component.KindExtension) + } + + for _, kind := range kinds { + // If a converter for this kind and type already exists, skip it. + if _, ok := table[converterKey{Kind: kind, Type: fact.Type()}]; ok { + continue + } + table[converterKey{Kind: kind, Type: fact.Type()}] = conv } } diff --git a/internal/converter/internal/staticconvert/internal/build/builder_traces.go b/internal/converter/internal/staticconvert/internal/build/builder_traces.go index da2411b9eaa3..1122ae9cec1b 100644 --- a/internal/converter/internal/staticconvert/internal/build/builder_traces.go +++ b/internal/converter/internal/staticconvert/internal/build/builder_traces.go @@ -7,9 +7,11 @@ import ( "github.com/grafana/agent/internal/converter/diag" "github.com/grafana/agent/internal/converter/internal/otelcolconvert" "github.com/grafana/agent/internal/static/traces" + "github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector" otel_component "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/exporter/loggingexporter" "go.opentelemetry.io/collector/otelcol" + "go.opentelemetry.io/collector/service/pipelines" ) // List of component converters. This slice is appended to by init functions in @@ -38,6 +40,7 @@ func (b *ConfigBuilder) appendTraces() { removeReceiver(otelCfg, "traces", "push_receiver") b.translateAutomaticLogging(otelCfg, cfg) + b.translateSpanMetrics(otelCfg, cfg) b.diags.AddAll(otelcolconvert.AppendConfig(b.f, otelCfg, labelPrefix, converters)) } @@ -68,6 +71,74 @@ func (b *ConfigBuilder) translateAutomaticLogging(otelCfg *otelcol.Config, cfg t removeProcessor(otelCfg, "traces", "automatic_logging") } +func (b *ConfigBuilder) translateSpanMetrics(otelCfg *otelcol.Config, cfg traces.InstanceConfig) { + if _, ok := otelCfg.Processors[otel_component.NewID("spanmetrics")]; !ok { + return + } + + // Remove the custom otel components and delete the custom metrics pipeline + removeProcessor(otelCfg, "traces", "spanmetrics") + removeReceiver(otelCfg, "metrics", "noop") + removeExporter(otelCfg, "metrics", "prometheus") + removePipeline(otelCfg, "metrics", "spanmetrics") + + // If the spanmetrics configuration includes a handler_endpoint, we cannot convert it. + // This is intentionally after the section above which removes the custom spanmetrics processor + // so that the rest of the configuration can optionally be converted with the error. + if cfg.SpanMetrics.HandlerEndpoint != "" { + b.diags.Add(diag.SeverityLevelError, "Cannot convert using configuration including spanmetrics handler_endpoint. "+ + "No equivalent exists for exposing a known /metrics endpoint. You can use metrics_instance instead to enabled conversion.") + return + } + + // Add the spanmetrics connector to the otel config with the converted configuration + if otelCfg.Connectors == nil { + otelCfg.Connectors = map[otel_component.ID]otel_component.Config{} + } + otelCfg.Connectors[otel_component.NewID("spanmetrics")] = toSpanmetricsConnector(cfg.SpanMetrics) + + // Add the spanmetrics connector to each traces pipelines as an exporter and create metrics pipelines. + // The processing ordering for the span metrics connector differs from the static pipelines since tail sampling + // in static mode processes after the custom span metrics processor. This is ok because the tail sampling + // processor is not processing metrics. + spanmetricsID := otel_component.NewID("spanmetrics") + remoteWriteID := otel_component.NewID("remote_write") + for ix, pipeline := range otelCfg.Service.Pipelines { + if ix.Type() == "traces" { + pipeline.Exporters = append(pipeline.Exporters, spanmetricsID) + + metricsId := otel_component.NewIDWithName("metrics", ix.Name()) + otelCfg.Service.Pipelines[metricsId] = &pipelines.PipelineConfig{} + otelCfg.Service.Pipelines[metricsId].Receivers = append(otelCfg.Service.Pipelines[metricsId].Receivers, spanmetricsID) + otelCfg.Service.Pipelines[metricsId].Exporters = append(otelCfg.Service.Pipelines[metricsId].Exporters, remoteWriteID) + } + } +} + +func toSpanmetricsConnector(cfg *traces.SpanMetricsConfig) *spanmetricsconnector.Config { + smc := spanmetricsconnector.NewFactory().CreateDefaultConfig().(*spanmetricsconnector.Config) + for _, dim := range cfg.Dimensions { + smc.Dimensions = append(smc.Dimensions, spanmetricsconnector.Dimension{Name: dim.Name, Default: dim.Default}) + } + if cfg.DimensionsCacheSize != 0 { + smc.DimensionsCacheSize = cfg.DimensionsCacheSize + } + if cfg.AggregationTemporality != "" { + smc.AggregationTemporality = cfg.AggregationTemporality + } + if len(cfg.LatencyHistogramBuckets) != 0 { + smc.Histogram.Explicit = &spanmetricsconnector.ExplicitHistogramConfig{Buckets: cfg.LatencyHistogramBuckets} + } + if cfg.MetricsFlushInterval != 0 { + smc.MetricsFlushInterval = cfg.MetricsFlushInterval + } + if cfg.Namespace != "" { + smc.Namespace = cfg.Namespace + } + + return smc +} + // removeReceiver removes a receiver from the otel config for a specific pipeline type. func removeReceiver(otelCfg *otelcol.Config, pipelineType otel_component.Type, receiverType otel_component.Type) { if _, ok := otelCfg.Receivers[otel_component.NewID(receiverType)]; !ok { @@ -111,3 +182,30 @@ func removeProcessor(otelCfg *otelcol.Config, pipelineType otel_component.Type, otelCfg.Service.Pipelines[ix].Processors = spr } } + +// removeExporter removes an exporter from the otel config for a specific pipeline type. +func removeExporter(otelCfg *otelcol.Config, pipelineType otel_component.Type, exporterType otel_component.Type) { + if _, ok := otelCfg.Exporters[otel_component.NewID(exporterType)]; !ok { + return + } + + delete(otelCfg.Exporters, otel_component.NewID(exporterType)) + for ix, p := range otelCfg.Service.Pipelines { + if ix.Type() != pipelineType { + continue + } + + spr := make([]otel_component.ID, 0) + for _, r := range p.Exporters { + if r.Type() != exporterType { + spr = append(spr, r) + } + } + otelCfg.Service.Pipelines[ix].Exporters = spr + } +} + +// removePipeline removes a pipeline from the otel config for a specific pipeline type. +func removePipeline(otelCfg *otelcol.Config, pipelineType otel_component.Type, pipelineName string) { + delete(otelCfg.Service.Pipelines, otel_component.NewIDWithName(pipelineType, pipelineName)) +} diff --git a/internal/converter/internal/staticconvert/internal/build/converter_discoveryprocessor.go b/internal/converter/internal/staticconvert/internal/build/converter_discoveryprocessor.go index 42142dc610e6..6739d13479f5 100644 --- a/internal/converter/internal/staticconvert/internal/build/converter_discoveryprocessor.go +++ b/internal/converter/internal/staticconvert/internal/build/converter_discoveryprocessor.go @@ -13,7 +13,6 @@ import ( "github.com/grafana/agent/internal/converter/internal/prometheusconvert/build" prometheus_component "github.com/grafana/agent/internal/converter/internal/prometheusconvert/component" "github.com/grafana/agent/internal/static/traces/promsdprocessor" - "github.com/grafana/river/scanner" prom_config "github.com/prometheus/prometheus/config" "go.opentelemetry.io/collector/component" "gopkg.in/yaml.v3" @@ -81,7 +80,7 @@ func toDiscoveryProcessor(state *otelcolconvert.State, id component.InstanceID, if label != "" { labelConcat = label + "_" + scrapeConfig.JobName } - label, _ := scanner.SanitizeIdentifier(labelConcat) + label := common.SanitizeIdentifierPanics(labelConcat) scrapeTargets := prometheusconvert.AppendServiceDiscoveryConfigs(pb, scrapeConfig.ServiceDiscoveryConfigs, label) promDiscoveryRelabelExports := prometheus_component.AppendDiscoveryRelabel(pb, scrapeConfig.RelabelConfigs, scrapeTargets, label) if promDiscoveryRelabelExports != nil { diff --git a/internal/converter/internal/staticconvert/internal/build/converter_remotewriteexporter.go b/internal/converter/internal/staticconvert/internal/build/converter_remotewriteexporter.go new file mode 100644 index 000000000000..7d17bcfa1d1c --- /dev/null +++ b/internal/converter/internal/staticconvert/internal/build/converter_remotewriteexporter.go @@ -0,0 +1,102 @@ +package build + +import ( + "fmt" + "sort" + + flow_relabel "github.com/grafana/agent/internal/component/common/relabel" + "github.com/grafana/agent/internal/component/otelcol/exporter/prometheus" + "github.com/grafana/agent/internal/component/prometheus/relabel" + "github.com/grafana/agent/internal/converter/diag" + "github.com/grafana/agent/internal/converter/internal/common" + "github.com/grafana/agent/internal/converter/internal/otelcolconvert" + "github.com/grafana/agent/internal/converter/internal/prometheusconvert/build" + prometheus_component "github.com/grafana/agent/internal/converter/internal/prometheusconvert/component" + "github.com/grafana/agent/internal/static/traces/remotewriteexporter" + prom_relabel "github.com/prometheus/prometheus/model/relabel" + "github.com/prometheus/prometheus/storage" + "go.opentelemetry.io/collector/component" +) + +func init() { + converters = append(converters, remoteWriteExporterConverter{}) +} + +type remoteWriteExporterConverter struct{} + +func (remoteWriteExporterConverter) Factory() component.Factory { + return remotewriteexporter.NewFactory() +} + +func (remoteWriteExporterConverter) InputComponentName() string { + return "otelcol.exporter.prometheus" +} + +func (remoteWriteExporterConverter) ConvertAndAppend(state *otelcolconvert.State, id component.InstanceID, cfg component.Config) diag.Diagnostics { + label := state.FlowComponentLabel() + + // We overloaded the ServerConfig.Endpoint field to be the prometheus.remote_write label + rwLabel := "metrics_" + cfg.(*remotewriteexporter.Config).PromInstance + forwardTo := []storage.Appendable{common.ConvertAppendable{Expr: fmt.Sprintf("prometheus.remote_write.%s.receiver", rwLabel)}} + if len(cfg.(*remotewriteexporter.Config).ConstLabels) > 0 { + exports := includeRelabelConfig(label, cfg, state, forwardTo) + forwardTo = []storage.Appendable{exports.Receiver} + } + + args := toremotewriteexporterConfig(cfg.(*remotewriteexporter.Config), forwardTo) + block := common.NewBlockWithOverride([]string{"otelcol", "exporter", "prometheus"}, label, args) + + var diags diag.Diagnostics + diags.Add( + diag.SeverityLevelInfo, + fmt.Sprintf("Converted %s into %s", otelcolconvert.StringifyInstanceID(id), otelcolconvert.StringifyBlock(block)), + ) + + state.Body().AppendBlock(block) + return diags +} + +func includeRelabelConfig(label string, cfg component.Config, state *otelcolconvert.State, forwardTo []storage.Appendable) *relabel.Exports { + pb := build.NewPrometheusBlocks() + + defaultRelabelConfigs := &flow_relabel.Config{} + defaultRelabelConfigs.SetToDefault() + relabelConfigs := []*prom_relabel.Config{} + + // sort they keys for consistency in map iteration + keys := make([]string, 0, len(cfg.(*remotewriteexporter.Config).ConstLabels)) + for label := range cfg.(*remotewriteexporter.Config).ConstLabels { + keys = append(keys, label) + } + sort.Strings(keys) + + for _, label := range keys { + relabelConfigs = append(relabelConfigs, &prom_relabel.Config{ + Separator: defaultRelabelConfigs.Separator, + Regex: prom_relabel.Regexp(defaultRelabelConfigs.Regex), + Modulus: defaultRelabelConfigs.Modulus, + TargetLabel: label, + Replacement: cfg.(*remotewriteexporter.Config).ConstLabels[label], + Action: prom_relabel.Action(defaultRelabelConfigs.Action), + }) + } + + exports := prometheus_component.AppendPrometheusRelabel(pb, relabelConfigs, forwardTo, label) + pb.AppendToBody(state.Body()) + return exports +} + +func toremotewriteexporterConfig(cfg *remotewriteexporter.Config, forwardTo []storage.Appendable) *prometheus.Arguments { + defaultArgs := &prometheus.Arguments{} + defaultArgs.SetToDefault() + + return &prometheus.Arguments{ + IncludeTargetInfo: defaultArgs.IncludeTargetInfo, + IncludeScopeInfo: defaultArgs.IncludeScopeInfo, + IncludeScopeLabels: defaultArgs.IncludeScopeLabels, + GCFrequency: cfg.StaleTime, + ForwardTo: forwardTo, + AddMetricSuffixes: defaultArgs.AddMetricSuffixes, + ResourceToTelemetryConversion: defaultArgs.ResourceToTelemetryConversion, + } +} diff --git a/internal/converter/internal/staticconvert/internal/build/self_exporter.go b/internal/converter/internal/staticconvert/internal/build/self_exporter.go index 31e7b50551e7..51c1049349d5 100644 --- a/internal/converter/internal/staticconvert/internal/build/self_exporter.go +++ b/internal/converter/internal/staticconvert/internal/build/self_exporter.go @@ -8,19 +8,19 @@ import ( ) func (b *ConfigBuilder) appendAgentExporter(config *agent_exporter.Config) discovery.Exports { - args := toAgentExporter(config) + args := toAgentExporter() return b.appendExporterBlock(args, config.Name(), nil, "self") } -func toAgentExporter(config *agent_exporter.Config) *self.Arguments { +func toAgentExporter() *self.Arguments { return &self.Arguments{} } func (b *ConfigBuilder) appendAgentExporterV2(config *agent_exporter_v2.Config) discovery.Exports { - args := toAgentExporterV2(config) + args := toAgentExporterV2() return b.appendExporterBlock(args, config.Name(), config.Common.InstanceKey, "self") } -func toAgentExporterV2(config *agent_exporter_v2.Config) *self.Arguments { +func toAgentExporterV2() *self.Arguments { return &self.Arguments{} } diff --git a/internal/converter/internal/staticconvert/testdata/traces.river b/internal/converter/internal/staticconvert/testdata/traces.river index f8d8a4e8f584..0e1684d924b1 100644 --- a/internal/converter/internal/staticconvert/testdata/traces.river +++ b/internal/converter/internal/staticconvert/testdata/traces.river @@ -1,3 +1,14 @@ +prometheus.remote_write "metrics_remote_write_name" { + endpoint { + name = "remote_write_name-149bbd" + url = "http://localhost:9009/api/prom/push" + + queue_config { } + + metadata_config { } + } +} + otelcol.extension.jaeger_remote_sampling "default_0" { grpc { } @@ -88,10 +99,29 @@ otelcol.processor.attributes "_0_default" { output { metrics = [] logs = [] - traces = [otelcol.exporter.loadbalancing._0_default.input, otelcol.exporter.logging._0_default.input] + traces = [otelcol.exporter.loadbalancing._0_default.input, otelcol.exporter.logging._0_default.input, otelcol.connector.spanmetrics._0_default.input] } } +prometheus.relabel "_0_default" { + forward_to = [prometheus.remote_write.metrics_remote_write_name.receiver] + + rule { + target_label = "fizz" + replacement = "buzz" + } + + rule { + target_label = "foo" + replacement = "bar" + } +} + +otelcol.exporter.prometheus "_0_default" { + gc_frequency = "0s" + forward_to = [prometheus.relabel._0_default.receiver] +} + otelcol.exporter.loadbalancing "_0_default" { protocol { otlp { @@ -114,6 +144,17 @@ otelcol.exporter.loadbalancing "_0_default" { otelcol.exporter.logging "_0_default" { } +otelcol.connector.spanmetrics "_0_default" { + histogram { + explicit { } + } + namespace = "metrics_prefix" + + output { + metrics = [otelcol.exporter.prometheus._0_default.input] + } +} + otelcol.receiver.otlp "_1_lb" { grpc { endpoint = "0.0.0.0:4318" @@ -146,10 +187,29 @@ otelcol.processor.batch "_1_default" { output { metrics = [] logs = [] - traces = [otelcol.exporter.otlp._1_0.input, otelcol.exporter.logging._1_default.input] + traces = [otelcol.exporter.otlp._1_0.input, otelcol.exporter.logging._1_default.input, otelcol.connector.spanmetrics._1_default.input] + } +} + +prometheus.relabel "_1_default" { + forward_to = [prometheus.remote_write.metrics_remote_write_name.receiver] + + rule { + target_label = "fizz" + replacement = "buzz" + } + + rule { + target_label = "foo" + replacement = "bar" } } +otelcol.exporter.prometheus "_1_default" { + gc_frequency = "0s" + forward_to = [prometheus.relabel._1_default.receiver] +} + otelcol.exporter.otlp "_1_0" { retry_on_failure { max_elapsed_time = "1m0s" @@ -165,3 +225,14 @@ otelcol.exporter.otlp "_1_0" { } otelcol.exporter.logging "_1_default" { } + +otelcol.connector.spanmetrics "_1_default" { + histogram { + explicit { } + } + namespace = "metrics_prefix" + + output { + metrics = [otelcol.exporter.prometheus._1_default.input] + } +} diff --git a/internal/converter/internal/staticconvert/testdata/traces.yaml b/internal/converter/internal/staticconvert/testdata/traces.yaml index 5a4cb2dfd332..a97262722fc5 100644 --- a/internal/converter/internal/staticconvert/testdata/traces.yaml +++ b/internal/converter/internal/staticconvert/testdata/traces.yaml @@ -40,9 +40,12 @@ traces: prom_sd_pod_associations: - ip - net.host.ip - # spanmetrics: - # namespace: testing - # metrics_instance: default + spanmetrics: + metrics_instance: remote_write_name + namespace: metrics_prefix + const_labels: + foo: bar + fizz: buzz tail_sampling: policies: [ @@ -67,7 +70,9 @@ traces: # This metrics config is needed when we enable spanmetrics for traces # -# metrics: -# global: -# remote_write: -# - url: http://localhost:9009/api/prom/push +metrics: + global: + remote_write: + - url: http://localhost:9009/api/prom/push + configs: + - name: remote_write_name \ No newline at end of file diff --git a/internal/converter/internal/staticconvert/testdata/unsupported.diags b/internal/converter/internal/staticconvert/testdata/unsupported.diags index 0958f0e79e69..c2b745cc05a2 100644 --- a/internal/converter/internal/staticconvert/testdata/unsupported.diags +++ b/internal/converter/internal/staticconvert/testdata/unsupported.diags @@ -1,6 +1,7 @@ (Error) The converter does not support handling integrations which are not being scraped: mssql. (Error) mapping_config is not supported in statsd_exporter integrations config (Error) automatic_logging for traces has no direct flow equivalent. A best effort translation can be made which only outputs to stdout and not directly to loki by bypassing errors. +(Error) Cannot convert using configuration including spanmetrics handler_endpoint. No equivalent exists for exposing a known /metrics endpoint. You can use metrics_instance instead to enabled conversion. (Warning) Please review your agent command line flags and ensure they are set in your Flow mode config file where necessary. (Error) The converter does not support converting the provided grpc_tls_config server config: flow mode does not have a gRPC server to configure. (Error) The converter does not support converting the provided prefer_server_cipher_suites server config. diff --git a/internal/converter/internal/staticconvert/testdata/unsupported.yaml b/internal/converter/internal/staticconvert/testdata/unsupported.yaml index 8dd14ab12531..d43a369fadfc 100644 --- a/internal/converter/internal/staticconvert/testdata/unsupported.yaml +++ b/internal/converter/internal/staticconvert/testdata/unsupported.yaml @@ -64,6 +64,8 @@ traces: - endpoint: http://localhost:1234/write automatic_logging: backend: "something else" + spanmetrics: + handler_endpoint: http://localhost:1234/write agent_management: host: host_name