Skip to content

Commit

Permalink
converting spanmetrics from static traces to flow (#6806)
Browse files Browse the repository at this point in the history
* convert spanmetrics from static traces to flow

Signed-off-by: erikbaranowski <[email protected]>

---------

Signed-off-by: erikbaranowski <[email protected]>
  • Loading branch information
erikbaranowski authored Apr 3, 2024
1 parent 7393c83 commit ccff374
Show file tree
Hide file tree
Showing 9 changed files with 313 additions and 24 deletions.
29 changes: 20 additions & 9 deletions internal/converter/internal/otelcolconvert/otelcolconvert.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,28 +291,39 @@ func validateNoDuplicateReceivers(groups []pipelineGroup, connectorIDs []compone

func buildConverterTable(extraConverters []ComponentConverter) map[converterKey]ComponentConverter {
table := make(map[converterKey]ComponentConverter)
allConverters := append(converters, extraConverters...)

// Ordering is critical here because conflicting converters are resolved with
// the first one in the list winning.
allConverters := append(extraConverters, converters...)

for _, conv := range allConverters {
fact := conv.Factory()

var kinds []component.Kind
switch fact.(type) {
case receiver.Factory:
table[converterKey{Kind: component.KindReceiver, Type: fact.Type()}] = conv
kinds = append(kinds, component.KindReceiver)
case processor.Factory:
table[converterKey{Kind: component.KindProcessor, Type: fact.Type()}] = conv
kinds = append(kinds, component.KindProcessor)
case exporter.Factory:
table[converterKey{Kind: component.KindExporter, Type: fact.Type()}] = conv
kinds = append(kinds, component.KindExporter)
case connector.Factory:
table[converterKey{Kind: component.KindConnector, Type: fact.Type()}] = conv
kinds = append(kinds, component.KindConnector)
// We need this so the connector is available as a destination for state.Next
table[converterKey{Kind: component.KindExporter, Type: fact.Type()}] = conv
kinds = append(kinds, component.KindExporter)
// Technically, this isn't required to be here since the entry
// won't be required to look up a destination for state.Next, but
// adding to reinforce the idea of how connectors are used.
table[converterKey{Kind: component.KindReceiver, Type: fact.Type()}] = conv
kinds = append(kinds, component.KindReceiver)
case extension.Factory:
table[converterKey{Kind: component.KindExtension, Type: fact.Type()}] = conv
kinds = append(kinds, component.KindExtension)
}

for _, kind := range kinds {
// If a converter for this kind and type already exists, skip it.
if _, ok := table[converterKey{Kind: kind, Type: fact.Type()}]; ok {
continue
}
table[converterKey{Kind: kind, Type: fact.Type()}] = conv
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
"github.com/grafana/agent/internal/converter/diag"
"github.com/grafana/agent/internal/converter/internal/otelcolconvert"
"github.com/grafana/agent/internal/static/traces"
"github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector"
otel_component "go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/exporter/loggingexporter"
"go.opentelemetry.io/collector/otelcol"
"go.opentelemetry.io/collector/service/pipelines"
)

// List of component converters. This slice is appended to by init functions in
Expand Down Expand Up @@ -38,6 +40,7 @@ func (b *ConfigBuilder) appendTraces() {
removeReceiver(otelCfg, "traces", "push_receiver")

b.translateAutomaticLogging(otelCfg, cfg)
b.translateSpanMetrics(otelCfg, cfg)

b.diags.AddAll(otelcolconvert.AppendConfig(b.f, otelCfg, labelPrefix, converters))
}
Expand Down Expand Up @@ -68,6 +71,74 @@ func (b *ConfigBuilder) translateAutomaticLogging(otelCfg *otelcol.Config, cfg t
removeProcessor(otelCfg, "traces", "automatic_logging")
}

func (b *ConfigBuilder) translateSpanMetrics(otelCfg *otelcol.Config, cfg traces.InstanceConfig) {
if _, ok := otelCfg.Processors[otel_component.NewID("spanmetrics")]; !ok {
return
}

// Remove the custom otel components and delete the custom metrics pipeline
removeProcessor(otelCfg, "traces", "spanmetrics")
removeReceiver(otelCfg, "metrics", "noop")
removeExporter(otelCfg, "metrics", "prometheus")
removePipeline(otelCfg, "metrics", "spanmetrics")

// If the spanmetrics configuration includes a handler_endpoint, we cannot convert it.
// This is intentionally after the section above which removes the custom spanmetrics processor
// so that the rest of the configuration can optionally be converted with the error.
if cfg.SpanMetrics.HandlerEndpoint != "" {
b.diags.Add(diag.SeverityLevelError, "Cannot convert using configuration including spanmetrics handler_endpoint. "+
"No equivalent exists for exposing a known /metrics endpoint. You can use metrics_instance instead to enabled conversion.")
return
}

// Add the spanmetrics connector to the otel config with the converted configuration
if otelCfg.Connectors == nil {
otelCfg.Connectors = map[otel_component.ID]otel_component.Config{}
}
otelCfg.Connectors[otel_component.NewID("spanmetrics")] = toSpanmetricsConnector(cfg.SpanMetrics)

// Add the spanmetrics connector to each traces pipelines as an exporter and create metrics pipelines.
// The processing ordering for the span metrics connector differs from the static pipelines since tail sampling
// in static mode processes after the custom span metrics processor. This is ok because the tail sampling
// processor is not processing metrics.
spanmetricsID := otel_component.NewID("spanmetrics")
remoteWriteID := otel_component.NewID("remote_write")
for ix, pipeline := range otelCfg.Service.Pipelines {
if ix.Type() == "traces" {
pipeline.Exporters = append(pipeline.Exporters, spanmetricsID)

metricsId := otel_component.NewIDWithName("metrics", ix.Name())
otelCfg.Service.Pipelines[metricsId] = &pipelines.PipelineConfig{}
otelCfg.Service.Pipelines[metricsId].Receivers = append(otelCfg.Service.Pipelines[metricsId].Receivers, spanmetricsID)
otelCfg.Service.Pipelines[metricsId].Exporters = append(otelCfg.Service.Pipelines[metricsId].Exporters, remoteWriteID)
}
}
}

func toSpanmetricsConnector(cfg *traces.SpanMetricsConfig) *spanmetricsconnector.Config {
smc := spanmetricsconnector.NewFactory().CreateDefaultConfig().(*spanmetricsconnector.Config)
for _, dim := range cfg.Dimensions {
smc.Dimensions = append(smc.Dimensions, spanmetricsconnector.Dimension{Name: dim.Name, Default: dim.Default})
}
if cfg.DimensionsCacheSize != 0 {
smc.DimensionsCacheSize = cfg.DimensionsCacheSize
}
if cfg.AggregationTemporality != "" {
smc.AggregationTemporality = cfg.AggregationTemporality
}
if len(cfg.LatencyHistogramBuckets) != 0 {
smc.Histogram.Explicit = &spanmetricsconnector.ExplicitHistogramConfig{Buckets: cfg.LatencyHistogramBuckets}
}
if cfg.MetricsFlushInterval != 0 {
smc.MetricsFlushInterval = cfg.MetricsFlushInterval
}
if cfg.Namespace != "" {
smc.Namespace = cfg.Namespace
}

return smc
}

// removeReceiver removes a receiver from the otel config for a specific pipeline type.
func removeReceiver(otelCfg *otelcol.Config, pipelineType otel_component.Type, receiverType otel_component.Type) {
if _, ok := otelCfg.Receivers[otel_component.NewID(receiverType)]; !ok {
Expand Down Expand Up @@ -111,3 +182,30 @@ func removeProcessor(otelCfg *otelcol.Config, pipelineType otel_component.Type,
otelCfg.Service.Pipelines[ix].Processors = spr
}
}

// removeExporter removes an exporter from the otel config for a specific pipeline type.
func removeExporter(otelCfg *otelcol.Config, pipelineType otel_component.Type, exporterType otel_component.Type) {
if _, ok := otelCfg.Exporters[otel_component.NewID(exporterType)]; !ok {
return
}

delete(otelCfg.Exporters, otel_component.NewID(exporterType))
for ix, p := range otelCfg.Service.Pipelines {
if ix.Type() != pipelineType {
continue
}

spr := make([]otel_component.ID, 0)
for _, r := range p.Exporters {
if r.Type() != exporterType {
spr = append(spr, r)
}
}
otelCfg.Service.Pipelines[ix].Exporters = spr
}
}

// removePipeline removes a pipeline from the otel config for a specific pipeline type.
func removePipeline(otelCfg *otelcol.Config, pipelineType otel_component.Type, pipelineName string) {
delete(otelCfg.Service.Pipelines, otel_component.NewIDWithName(pipelineType, pipelineName))
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/grafana/agent/internal/converter/internal/prometheusconvert/build"
prometheus_component "github.com/grafana/agent/internal/converter/internal/prometheusconvert/component"
"github.com/grafana/agent/internal/static/traces/promsdprocessor"
"github.com/grafana/river/scanner"
prom_config "github.com/prometheus/prometheus/config"
"go.opentelemetry.io/collector/component"
"gopkg.in/yaml.v3"
Expand Down Expand Up @@ -81,7 +80,7 @@ func toDiscoveryProcessor(state *otelcolconvert.State, id component.InstanceID,
if label != "" {
labelConcat = label + "_" + scrapeConfig.JobName
}
label, _ := scanner.SanitizeIdentifier(labelConcat)
label := common.SanitizeIdentifierPanics(labelConcat)
scrapeTargets := prometheusconvert.AppendServiceDiscoveryConfigs(pb, scrapeConfig.ServiceDiscoveryConfigs, label)
promDiscoveryRelabelExports := prometheus_component.AppendDiscoveryRelabel(pb, scrapeConfig.RelabelConfigs, scrapeTargets, label)
if promDiscoveryRelabelExports != nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package build

import (
"fmt"
"sort"

flow_relabel "github.com/grafana/agent/internal/component/common/relabel"
"github.com/grafana/agent/internal/component/otelcol/exporter/prometheus"
"github.com/grafana/agent/internal/component/prometheus/relabel"
"github.com/grafana/agent/internal/converter/diag"
"github.com/grafana/agent/internal/converter/internal/common"
"github.com/grafana/agent/internal/converter/internal/otelcolconvert"
"github.com/grafana/agent/internal/converter/internal/prometheusconvert/build"
prometheus_component "github.com/grafana/agent/internal/converter/internal/prometheusconvert/component"
"github.com/grafana/agent/internal/static/traces/remotewriteexporter"
prom_relabel "github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/storage"
"go.opentelemetry.io/collector/component"
)

func init() {
converters = append(converters, remoteWriteExporterConverter{})
}

type remoteWriteExporterConverter struct{}

func (remoteWriteExporterConverter) Factory() component.Factory {
return remotewriteexporter.NewFactory()
}

func (remoteWriteExporterConverter) InputComponentName() string {
return "otelcol.exporter.prometheus"
}

func (remoteWriteExporterConverter) ConvertAndAppend(state *otelcolconvert.State, id component.InstanceID, cfg component.Config) diag.Diagnostics {
label := state.FlowComponentLabel()

// We overloaded the ServerConfig.Endpoint field to be the prometheus.remote_write label
rwLabel := "metrics_" + cfg.(*remotewriteexporter.Config).PromInstance
forwardTo := []storage.Appendable{common.ConvertAppendable{Expr: fmt.Sprintf("prometheus.remote_write.%s.receiver", rwLabel)}}
if len(cfg.(*remotewriteexporter.Config).ConstLabels) > 0 {
exports := includeRelabelConfig(label, cfg, state, forwardTo)
forwardTo = []storage.Appendable{exports.Receiver}
}

args := toremotewriteexporterConfig(cfg.(*remotewriteexporter.Config), forwardTo)
block := common.NewBlockWithOverride([]string{"otelcol", "exporter", "prometheus"}, label, args)

var diags diag.Diagnostics
diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Converted %s into %s", otelcolconvert.StringifyInstanceID(id), otelcolconvert.StringifyBlock(block)),
)

state.Body().AppendBlock(block)
return diags
}

func includeRelabelConfig(label string, cfg component.Config, state *otelcolconvert.State, forwardTo []storage.Appendable) *relabel.Exports {
pb := build.NewPrometheusBlocks()

defaultRelabelConfigs := &flow_relabel.Config{}
defaultRelabelConfigs.SetToDefault()
relabelConfigs := []*prom_relabel.Config{}

// sort they keys for consistency in map iteration
keys := make([]string, 0, len(cfg.(*remotewriteexporter.Config).ConstLabels))
for label := range cfg.(*remotewriteexporter.Config).ConstLabels {
keys = append(keys, label)
}
sort.Strings(keys)

for _, label := range keys {
relabelConfigs = append(relabelConfigs, &prom_relabel.Config{
Separator: defaultRelabelConfigs.Separator,
Regex: prom_relabel.Regexp(defaultRelabelConfigs.Regex),
Modulus: defaultRelabelConfigs.Modulus,
TargetLabel: label,
Replacement: cfg.(*remotewriteexporter.Config).ConstLabels[label],
Action: prom_relabel.Action(defaultRelabelConfigs.Action),
})
}

exports := prometheus_component.AppendPrometheusRelabel(pb, relabelConfigs, forwardTo, label)
pb.AppendToBody(state.Body())
return exports
}

func toremotewriteexporterConfig(cfg *remotewriteexporter.Config, forwardTo []storage.Appendable) *prometheus.Arguments {
defaultArgs := &prometheus.Arguments{}
defaultArgs.SetToDefault()

return &prometheus.Arguments{
IncludeTargetInfo: defaultArgs.IncludeTargetInfo,
IncludeScopeInfo: defaultArgs.IncludeScopeInfo,
IncludeScopeLabels: defaultArgs.IncludeScopeLabels,
GCFrequency: cfg.StaleTime,
ForwardTo: forwardTo,
AddMetricSuffixes: defaultArgs.AddMetricSuffixes,
ResourceToTelemetryConversion: defaultArgs.ResourceToTelemetryConversion,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ import (
)

func (b *ConfigBuilder) appendAgentExporter(config *agent_exporter.Config) discovery.Exports {
args := toAgentExporter(config)
args := toAgentExporter()
return b.appendExporterBlock(args, config.Name(), nil, "self")
}

func toAgentExporter(config *agent_exporter.Config) *self.Arguments {
func toAgentExporter() *self.Arguments {
return &self.Arguments{}
}

func (b *ConfigBuilder) appendAgentExporterV2(config *agent_exporter_v2.Config) discovery.Exports {
args := toAgentExporterV2(config)
args := toAgentExporterV2()
return b.appendExporterBlock(args, config.Name(), config.Common.InstanceKey, "self")
}

func toAgentExporterV2(config *agent_exporter_v2.Config) *self.Arguments {
func toAgentExporterV2() *self.Arguments {
return &self.Arguments{}
}
Loading

0 comments on commit ccff374

Please sign in to comment.