Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(POC): otelcol httpcheck receiver - use tags as metrics labels #2881

Draft
wants to merge 13 commits into
base: develop
Choose a base branch
from
46 changes: 13 additions & 33 deletions agent/backend/otel/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (o *openTelemetryBackend) Version() (string, error) {
return o.otelCurrVersion, nil
}
ctx, cancel := context.WithTimeout(o.mainContext, 60*time.Second)
defer cancel()
var versionOutput string
command := cmd.NewCmd(o.otelExecutablePath, "--version")
status := command.Start()
Expand All @@ -127,7 +128,6 @@ func (o *openTelemetryBackend) Version() (string, error) {
o.logger.Error("timeout during getting version", zap.Error(ctx.Err()))
}

cancel()
o.logger.Info("running opentelemetry-contrib version", zap.String("version", versionOutput))

return versionOutput, nil
Expand Down Expand Up @@ -239,50 +239,30 @@ func (o *openTelemetryBackend) createOtlpMetricMqttExporter(ctx context.Context,

func (o *openTelemetryBackend) createOtlpTraceMqttExporter(ctx context.Context, cancelFunc context.CancelFunc) (exporter.Traces, error) {
bridgeService := otel.NewBridgeService(ctx, cancelFunc, &o.policyRepo, o.agentTags)
var cfg component.Config
if o.mqttClient != nil {
cfg := otlpmqttexporter.CreateConfigClient(o.mqttClient, o.otlpTracesTopic, "", bridgeService)
set := otlpmqttexporter.CreateDefaultSettings(o.logger)
// Create the OTLP metrics metricsExporter that'll receive and verify the metrics produced.
tracerExporter, err := otlpmqttexporter.CreateTracesExporter(ctx, set, cfg)
if err != nil {
return nil, err
}
return tracerExporter, nil
cfg = otlpmqttexporter.CreateConfigClient(o.mqttClient, o.otlpTracesTopic, "", bridgeService)
} else {
cfg := otlpmqttexporter.CreateConfig(o.mqttConfig.Address, o.mqttConfig.Id, o.mqttConfig.Key,
cfg = otlpmqttexporter.CreateConfig(o.mqttConfig.Address, o.mqttConfig.Id, o.mqttConfig.Key,
o.mqttConfig.ChannelID, "", o.otlpTracesTopic, bridgeService)
set := otlpmqttexporter.CreateDefaultSettings(o.logger)
// Create the OTLP metrics exporter that'll receive and verify the metrics produced.
tracerExporter, err := otlpmqttexporter.CreateTracesExporter(ctx, set, cfg)
if err != nil {
return nil, err
}
return tracerExporter, nil
}

set := otlpmqttexporter.CreateDefaultSettings(o.logger)
// Create the OTLP traces that'll receive and verify the metrics produced.
return otlpmqttexporter.CreateTracesExporter(ctx, set, cfg)
}

func (o *openTelemetryBackend) createOtlpLogsMqttExporter(ctx context.Context, cancelFunc context.CancelFunc) (exporter.Logs, error) {
bridgeService := otel.NewBridgeService(ctx, cancelFunc, &o.policyRepo, o.agentTags)
var cfg component.Config
if o.mqttClient != nil {
cfg := otlpmqttexporter.CreateConfigClient(o.mqttClient, o.otlpLogsTopic, "", bridgeService)
set := otlpmqttexporter.CreateDefaultSettings(o.logger)
// Create the OTLP metrics metricsExporter that'll receive and verify the metrics produced.
exporter, err := otlpmqttexporter.CreateLogsExporter(ctx, set, cfg)
if err != nil {
return nil, err
}
return exporter, nil
cfg = otlpmqttexporter.CreateConfigClient(o.mqttClient, o.otlpLogsTopic, "", bridgeService)
} else {
cfg := otlpmqttexporter.CreateConfig(o.mqttConfig.Address, o.mqttConfig.Id, o.mqttConfig.Key,
cfg = otlpmqttexporter.CreateConfig(o.mqttConfig.Address, o.mqttConfig.Id, o.mqttConfig.Key,
o.mqttConfig.ChannelID, "", o.otlpLogsTopic, bridgeService)
set := otlpmqttexporter.CreateDefaultSettings(o.logger)
// Create the OTLP metrics exporter that'll receive and verify the metrics produced.
exporter, err := otlpmqttexporter.CreateLogsExporter(ctx, set, cfg)
if err != nil {
return nil, err
}
return exporter, nil
}

set := otlpmqttexporter.CreateDefaultSettings(o.logger)
// Create the OTLP logs exporter that'll receive and verify the metrics produced.
return otlpmqttexporter.CreateLogsExporter(ctx, set, cfg)
}
45 changes: 44 additions & 1 deletion agent/backend/otel/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package otel

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
Expand All @@ -26,7 +27,14 @@ type runningPolicy struct {

func (o *openTelemetryBackend) ApplyPolicy(newPolicyData policies.PolicyData, updatePolicy bool) error {
o.logger.Debug("applying policy", zap.String("policy_id", newPolicyData.ID))
policyYaml, err := yaml.Marshal(newPolicyData.Data)

sanitizedPolicyData, err := SanitizePolicyData(newPolicyData)
if err != nil {
o.logger.Error("deleting tags from httpcheck targets failed", zap.String("policy_id", newPolicyData.ID), zap.Error(err))
return err
}

policyYaml, err := yaml.Marshal(sanitizedPolicyData.Data)
if err != nil {
o.logger.Warn("yaml policy marshal failure", zap.String("policy_id", newPolicyData.ID), zap.Any("policy", newPolicyData.Data))
return err
Expand Down Expand Up @@ -125,6 +133,7 @@ func (o *openTelemetryBackend) addRunner(policyData policies.PolicyData, policyF
}
}(policyContext, o.logger)
status := command.Status()

policyEntry := runningPolicy{
ctx: policyContext,
cancel: policyCancel,
Expand Down Expand Up @@ -175,3 +184,37 @@ func (o *openTelemetryBackend) ValidatePolicy(otelConfig openTelemetryConfig) er

return nil
}

func SanitizePolicyData(policyData policies.PolicyData) (*policies.PolicyData, error) {
originalJSON, err := json.Marshal(policyData)
if err != nil {
return nil, err
}
var policyDataClone policies.PolicyData
if err = json.Unmarshal(originalJSON, &policyDataClone); err != nil {
return nil, err
}

if policyDataClone.Backend == "otel" {
receivers, ok := policyDataClone.Data.(map[string]interface{})["receivers"]
if !ok {
return &policyData, nil
}
httpcheck, ok := receivers.(map[string]interface{})["httpcheck"]
if !ok {
return &policyData, nil
}
targets, ok := httpcheck.(map[string]interface{})["targets"]
if !ok {
return &policyData, nil
}
for _, target := range targets.([]interface{}) {
if _, ok := target.(map[string]interface{})["tags"]; !ok {
return &policyData, nil
}
delete(target.(map[string]interface{}), "tags")
}
}

return &policyDataClone, nil
}
63 changes: 63 additions & 0 deletions agent/backend/otel/policy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package otel_test

import (
"testing"
"time"

"github.com/orb-community/orb/agent/backend/otel"
"github.com/orb-community/orb/agent/policies"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
)

func TestSanitizePolicyData(t *testing.T) {
newPolicyData := policies.PolicyData{
ID: "test-policy-id",
Name: "test-policy",
Backend: "otel",
Version: 0,
Format: "yaml",
State: policies.Running,
LastScrapeBytes: 0,
LastScrapeTS: time.Now(),
Data: map[string]interface{}{
"receivers": map[string]interface{}{
"httpcheck": map[string]interface{}{
"collection_interval": "60s",
"targets": []interface{}{
map[string]interface{}{
"endpoint": "https://example.com",
"method": "GET",
"tags": map[string]string{
"foo": "bar",
},
},
},
},
},
"exporters": map[string]interface{}{},
"service": map[string]interface{}{
"pipelines": map[string]interface{}{
"metrics": map[string]interface{}{
"exporters": nil,
"receivers": []string{"httpcheck"},
},
},
},
},
PreviousPolicyData: nil,
}

policyYaml, err := yaml.Marshal(newPolicyData.Data)
require.NoError(t, err)

copyPolicyData, err := otel.SanitizePolicyData(newPolicyData)
require.NoError(t, err)

copyPolicyYaml, err := yaml.Marshal(copyPolicyData.Data)
require.NoError(t, err)

assert.NotEqual(t, newPolicyData, copyPolicyData)
assert.NotEqual(t, string(policyYaml), string(copyPolicyYaml))
}
7 changes: 6 additions & 1 deletion agent/otel/bridgeservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

type AgentBridgeService interface {
RetrieveAgentInfoByPolicyName(policyName string) (*AgentDataPerPolicy, error)
RetrievePolicyByName(name string) (policies.PolicyData, error)
NotifyAgentDisconnection(ctx context.Context, err error)
}

Expand Down Expand Up @@ -48,7 +49,11 @@ func (b *BridgeService) RetrieveAgentInfoByPolicyName(policyName string) (*Agent
}, nil
}

func (b *BridgeService) NotifyAgentDisconnection(ctx context.Context, err error) {
func (b *BridgeService) RetrievePolicyByName(name string) (policies.PolicyData, error) {
return b.policyRepo.GetByName(name)
}

func (b *BridgeService) NotifyAgentDisconnection(ctx context.Context, _ error) {
ctx.Done()
b.cancelFunc()
}
23 changes: 23 additions & 0 deletions agent/otel/otlpmqttexporter/collectorconfig.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package otlpmqttexporter

import (
"go.opentelemetry.io/collector/component"
)

type CollectorConfig struct {
Receivers map[component.ID]component.Config `mapstructure:"receivers"`
Extensions map[string]interface{} `mapstructure:"extensions,omitempty"`
Exporters map[string]interface{} `mapstructure:"exporters,omitempty"`
Service map[string]interface{} `mapstructure:"service,omitempty"`
}

type HTTPCheckReceiver struct {
CollectionInterval string `mapstructure:"collection_interval"`
Targets []HTTPCheckTarget `mapstructure:"targets"`
}

type HTTPCheckTarget struct {
Endpoint string `mapstructure:"endpoint"`
Method string `mapstructure:"method"`
Tags map[string]string `mapstructure:"tags,omitempty"`
}
81 changes: 81 additions & 0 deletions agent/otel/otlpmqttexporter/collectorconfig_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package otlpmqttexporter_test

import (
"encoding/json"
"testing"

"github.com/mitchellh/mapstructure"
"github.com/orb-community/orb/agent/otel/otlpmqttexporter"
"github.com/orb-community/orb/agent/policies"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestExtractCollectorConfig(t *testing.T) {
policyJSON := `{
"ID": "test-policy-id",
"Datasets": null,
"GroupIds": null,
"Name": "test-policy",
"Backend": "otel",
"Version": 0,
"Format": "yaml",
"Data": {
"exporters": {},
"receivers": {
"httpcheck": {
"collection_interval": "60s",
"targets": [
{
"endpoint": "https://example.com",
"method": "GET",
"tags": {
"foo": "bar"
}
}
]
}
},
"service": {
"pipelines": {
"metrics": {
"exporters": null,
"receivers": [
"httpcheck"
]
}
}
}
},
"State": 1,
"BackendErr": "",
"LastScrapeBytes": 0,
"LastScrapeTS": "2023-12-18T13:57:42.024296Z",
"PreviousPolicyData": null
}`
var policy policies.PolicyData
if err := json.Unmarshal([]byte(policyJSON), &policy); err != nil {
t.Fatal(err)
}

cfg, err := otlpmqttexporter.ExtractCollectorConfig(policy)
require.NoError(t, err)
assert.Equal(t, 1, len(cfg.Receivers))

for key, value := range cfg.Receivers {
switch key.Type() {
case "httpcheck":
var httpcheck otlpmqttexporter.HTTPCheckReceiver
if err := mapstructure.Decode(value, &httpcheck); err != nil {
t.Fatal(err)
}
assert.Equal(t, "60s", httpcheck.CollectionInterval)
assert.Equal(t, 1, len(httpcheck.Targets))
for _, target := range httpcheck.Targets {
assert.Equal(t, "https://example.com", target.Endpoint)
assert.Equal(t, "GET", target.Method)
assert.Equal(t, map[string]string{"foo": "bar"}, target.Tags)
}
}
}
}
6 changes: 3 additions & 3 deletions agent/otel/otlpmqttexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func CreateTracesExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Traces, error) {
oce, err := newExporter(cfg, set, ctx)
oce, err := newExporter(cfg, set)
if err != nil {
return nil, err
}
Expand All @@ -119,7 +119,7 @@ func CreateMetricsExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Metrics, error) {
oce, err := newExporter(cfg, set, ctx)
oce, err := newExporter(cfg, set)
if err != nil {
return nil, err
}
Expand All @@ -142,7 +142,7 @@ func CreateLogsExporter(
set exporter.CreateSettings,
cfg component.Config,
) (exporter.Logs, error) {
oce, err := newExporter(cfg, set, ctx)
oce, err := newExporter(cfg, set)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading