Skip to content

Commit

Permalink
otelcolconvert: support converting servicegraph connector (#6681)
Browse files Browse the repository at this point in the history
* otelcolconvert: support converting servicegraph connector

Signed-off-by: Paschalis Tsilias <[email protected]>

* fix for otel convert connectors

Signed-off-by: erikbaranowski <[email protected]>

---------

Signed-off-by: Paschalis Tsilias <[email protected]>
Signed-off-by: erikbaranowski <[email protected]>
Co-authored-by: Erik Baranowski <[email protected]>
  • Loading branch information
tpaschalis and erikbaranowski authored Mar 27, 2024
1 parent e908a2f commit ef2f5f3
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package otelcolconvert

import (
"fmt"

"github.com/grafana/agent/internal/component/otelcol"
"github.com/grafana/agent/internal/component/otelcol/connector/servicegraph"
"github.com/grafana/agent/internal/converter/diag"
"github.com/grafana/agent/internal/converter/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/connector/servicegraphconnector"
"go.opentelemetry.io/collector/component"
)

func init() {
converters = append(converters, servicegraphConnectorConverter{})
}

type servicegraphConnectorConverter struct{}

func (servicegraphConnectorConverter) Factory() component.Factory {
return servicegraphconnector.NewFactory()
}

func (servicegraphConnectorConverter) InputComponentName() string {
return "otelcol.connector.servicegraph"
}

func (servicegraphConnectorConverter) ConvertAndAppend(state *State, id component.InstanceID, cfg component.Config) diag.Diagnostics {
var diags diag.Diagnostics

label := state.FlowComponentLabel()

args := toServicegraphConnector(state, id, cfg.(*servicegraphconnector.Config))
block := common.NewBlockWithOverride([]string{"otelcol", "connector", "servicegraph"}, label, args)

diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(block)),
)

state.Body().AppendBlock(block)
return diags
}

func toServicegraphConnector(state *State, id component.InstanceID, cfg *servicegraphconnector.Config) *servicegraph.Arguments {
if cfg == nil {
return nil
}
var (
nextMetrics = state.Next(id, component.DataTypeMetrics)
)

return &servicegraph.Arguments{
LatencyHistogramBuckets: cfg.LatencyHistogramBuckets,
Dimensions: cfg.Dimensions,
Store: servicegraph.StoreConfig{
MaxItems: cfg.Store.MaxItems,
TTL: cfg.Store.TTL,
},
CacheLoop: cfg.CacheLoop,
StoreExpirationLoop: cfg.StoreExpirationLoop,
MetricsFlushInterval: cfg.MetricsFlushInterval,
Output: &otelcol.ConsumerArguments{
Metrics: ToTokenizedConsumers(nextMetrics),
},
}
}
19 changes: 19 additions & 0 deletions internal/converter/internal/otelcolconvert/pipeline_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ func (group pipelineGroup) NextTraces(fromID component.InstanceID) []component.I
func nextInPipeline(pipeline *pipelines.PipelineConfig, fromID component.InstanceID) []component.InstanceID {
switch fromID.Kind {
case component.KindReceiver, component.KindConnector:
// Validate this receiver is part of the pipeline.
if !findInComponentIds(fromID, pipeline.Receivers) {
return nil
}

// Receivers and connectors should either send to the first processor
// if one exists or to every exporter otherwise.
if len(pipeline.Processors) > 0 {
Expand All @@ -183,6 +188,11 @@ func nextInPipeline(pipeline *pipelines.PipelineConfig, fromID component.Instanc
return toComponentInstanceIDs(component.KindExporter, pipeline.Exporters)

case component.KindProcessor:
// Validate this processor is part of the pipeline.
if !findInComponentIds(fromID, pipeline.Processors) {
return nil
}

// Processors should send to the next processor if one exists or to every
// exporter otherwise.
processorIndex := slices.Index(pipeline.Processors, fromID.ID)
Expand Down Expand Up @@ -217,3 +227,12 @@ func toComponentInstanceIDs(kind component.Kind, ids []component.ID) []component

return res
}

func findInComponentIds(fromID component.InstanceID, componentIDs []component.ID) bool {
for _, id := range componentIDs {
if fromID.ID == id {
return true
}
}
return false
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ otelcol.receiver.otlp "default" {

otelcol.processor.batch "default" {
output {
metrics = [otelcol.exporter.otlp.default.input]
metrics = []
logs = [otelcol.exporter.otlp.default.input]
traces = [otelcol.exporter.otlp.default.input]
traces = []
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
otelcol.receiver.otlp "default" {
grpc { }

http { }

output {
metrics = []
logs = []
traces = [otelcol.connector.servicegraph.default.input]
}
}

otelcol.exporter.otlp "default" {
sending_queue {
queue_size = 5000
}

client {
endpoint = "database:4317"
}
}

otelcol.connector.servicegraph "default" {
latency_histogram_buckets = ["100ms", "250ms", "1s", "5s", "10s"]
dimensions = ["dimension-1", "dimension-2"]

store {
max_items = 10
ttl = "1s"
}
cache_loop = "2m0s"
store_expiration_loop = "5s"

output {
metrics = [otelcol.exporter.otlp.default.input]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
receivers:
otlp:
protocols:
grpc:
http:

connectors:
servicegraph:
latency_histogram_buckets: [100ms, 250ms, 1s, 5s, 10s]
dimensions:
- dimension-1
- dimension-2
store:
ttl: 1s
max_items: 10
cache_loop: 2m
store_expiration_loop: 5s

exporters:
otlp:
# Our defaults have drifted from upstream, so we explicitly set our
# defaults below (balancer_name and queue_size).
endpoint: database:4317
balancer_name: pick_first
sending_queue:
queue_size: 5000

service:
pipelines:
traces:
receivers: [otlp]
exporters: [servicegraph]
metrics:
receivers: [servicegraph]
exporters: [otlp]
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ otelcol.receiver.otlp "default" {
http { }

output {
metrics = [otelcol.exporter.otlp.default.input]
metrics = []
logs = [otelcol.exporter.otlp.default.input]
traces = [otelcol.connector.spanmetrics.default.input]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ otelcol.receiver.otlp "default_traces" {
http { }

output {
metrics = [otelcol.exporter.otlp.default_metrics_backend.input]
metrics = []
logs = []
traces = [otelcol.exporter.otlp.default_traces_backend.input, otelcol.connector.spanmetrics.default.input]
}
Expand Down

0 comments on commit ef2f5f3

Please sign in to comment.