diff --git a/agent/backend/otel/otel.go b/agent/backend/otel/otel.go index 68dff2454..ae262d69c 100644 --- a/agent/backend/otel/otel.go +++ b/agent/backend/otel/otel.go @@ -110,6 +110,7 @@ func (o *openTelemetryBackend) Version() (string, error) { return o.otelCurrVersion, nil } ctx, cancel := context.WithTimeout(o.mainContext, 60*time.Second) + defer cancel() var versionOutput string command := cmd.NewCmd(o.otelExecutablePath, "--version") status := command.Start() @@ -127,7 +128,6 @@ func (o *openTelemetryBackend) Version() (string, error) { o.logger.Error("timeout during getting version", zap.Error(ctx.Err())) } - cancel() o.logger.Info("running opentelemetry-contrib version", zap.String("version", versionOutput)) return versionOutput, nil @@ -239,50 +239,30 @@ func (o *openTelemetryBackend) createOtlpMetricMqttExporter(ctx context.Context, func (o *openTelemetryBackend) createOtlpTraceMqttExporter(ctx context.Context, cancelFunc context.CancelFunc) (exporter.Traces, error) { bridgeService := otel.NewBridgeService(ctx, cancelFunc, &o.policyRepo, o.agentTags) + var cfg component.Config if o.mqttClient != nil { - cfg := otlpmqttexporter.CreateConfigClient(o.mqttClient, o.otlpTracesTopic, "", bridgeService) - set := otlpmqttexporter.CreateDefaultSettings(o.logger) - // Create the OTLP metrics metricsExporter that'll receive and verify the metrics produced. - tracerExporter, err := otlpmqttexporter.CreateTracesExporter(ctx, set, cfg) - if err != nil { - return nil, err - } - return tracerExporter, nil + cfg = otlpmqttexporter.CreateConfigClient(o.mqttClient, o.otlpTracesTopic, "", bridgeService) } else { - cfg := otlpmqttexporter.CreateConfig(o.mqttConfig.Address, o.mqttConfig.Id, o.mqttConfig.Key, + cfg = otlpmqttexporter.CreateConfig(o.mqttConfig.Address, o.mqttConfig.Id, o.mqttConfig.Key, o.mqttConfig.ChannelID, "", o.otlpTracesTopic, bridgeService) - set := otlpmqttexporter.CreateDefaultSettings(o.logger) - // Create the OTLP metrics exporter that'll receive and verify the metrics produced. - tracerExporter, err := otlpmqttexporter.CreateTracesExporter(ctx, set, cfg) - if err != nil { - return nil, err - } - return tracerExporter, nil } + set := otlpmqttexporter.CreateDefaultSettings(o.logger) + // Create the OTLP traces that'll receive and verify the metrics produced. + return otlpmqttexporter.CreateTracesExporter(ctx, set, cfg) } func (o *openTelemetryBackend) createOtlpLogsMqttExporter(ctx context.Context, cancelFunc context.CancelFunc) (exporter.Logs, error) { bridgeService := otel.NewBridgeService(ctx, cancelFunc, &o.policyRepo, o.agentTags) + var cfg component.Config if o.mqttClient != nil { - cfg := otlpmqttexporter.CreateConfigClient(o.mqttClient, o.otlpLogsTopic, "", bridgeService) - set := otlpmqttexporter.CreateDefaultSettings(o.logger) - // Create the OTLP metrics metricsExporter that'll receive and verify the metrics produced. - exporter, err := otlpmqttexporter.CreateLogsExporter(ctx, set, cfg) - if err != nil { - return nil, err - } - return exporter, nil + cfg = otlpmqttexporter.CreateConfigClient(o.mqttClient, o.otlpLogsTopic, "", bridgeService) } else { - cfg := otlpmqttexporter.CreateConfig(o.mqttConfig.Address, o.mqttConfig.Id, o.mqttConfig.Key, + cfg = otlpmqttexporter.CreateConfig(o.mqttConfig.Address, o.mqttConfig.Id, o.mqttConfig.Key, o.mqttConfig.ChannelID, "", o.otlpLogsTopic, bridgeService) - set := otlpmqttexporter.CreateDefaultSettings(o.logger) - // Create the OTLP metrics exporter that'll receive and verify the metrics produced. - exporter, err := otlpmqttexporter.CreateLogsExporter(ctx, set, cfg) - if err != nil { - return nil, err - } - return exporter, nil } + set := otlpmqttexporter.CreateDefaultSettings(o.logger) + // Create the OTLP logs exporter that'll receive and verify the metrics produced. + return otlpmqttexporter.CreateLogsExporter(ctx, set, cfg) } diff --git a/agent/backend/otel/policy.go b/agent/backend/otel/policy.go index 0a2df10a0..2d2de96e5 100644 --- a/agent/backend/otel/policy.go +++ b/agent/backend/otel/policy.go @@ -2,6 +2,7 @@ package otel import ( "context" + "encoding/json" "errors" "fmt" "os" @@ -26,7 +27,14 @@ type runningPolicy struct { func (o *openTelemetryBackend) ApplyPolicy(newPolicyData policies.PolicyData, updatePolicy bool) error { o.logger.Debug("applying policy", zap.String("policy_id", newPolicyData.ID)) - policyYaml, err := yaml.Marshal(newPolicyData.Data) + + sanitizedPolicyData, err := SanitizePolicyData(newPolicyData) + if err != nil { + o.logger.Error("deleting tags from httpcheck targets failed", zap.String("policy_id", newPolicyData.ID), zap.Error(err)) + return err + } + + policyYaml, err := yaml.Marshal(sanitizedPolicyData.Data) if err != nil { o.logger.Warn("yaml policy marshal failure", zap.String("policy_id", newPolicyData.ID), zap.Any("policy", newPolicyData.Data)) return err @@ -125,6 +133,7 @@ func (o *openTelemetryBackend) addRunner(policyData policies.PolicyData, policyF } }(policyContext, o.logger) status := command.Status() + policyEntry := runningPolicy{ ctx: policyContext, cancel: policyCancel, @@ -175,3 +184,37 @@ func (o *openTelemetryBackend) ValidatePolicy(otelConfig openTelemetryConfig) er return nil } + +func SanitizePolicyData(policyData policies.PolicyData) (*policies.PolicyData, error) { + originalJSON, err := json.Marshal(policyData) + if err != nil { + return nil, err + } + var policyDataClone policies.PolicyData + if err = json.Unmarshal(originalJSON, &policyDataClone); err != nil { + return nil, err + } + + if policyDataClone.Backend == "otel" { + receivers, ok := policyDataClone.Data.(map[string]interface{})["receivers"] + if !ok { + return &policyData, nil + } + httpcheck, ok := receivers.(map[string]interface{})["httpcheck"] + if !ok { + return &policyData, nil + } + targets, ok := httpcheck.(map[string]interface{})["targets"] + if !ok { + return &policyData, nil + } + for _, target := range targets.([]interface{}) { + if _, ok := target.(map[string]interface{})["tags"]; !ok { + return &policyData, nil + } + delete(target.(map[string]interface{}), "tags") + } + } + + return &policyDataClone, nil +} diff --git a/agent/backend/otel/policy_test.go b/agent/backend/otel/policy_test.go new file mode 100644 index 000000000..dd294ee1a --- /dev/null +++ b/agent/backend/otel/policy_test.go @@ -0,0 +1,63 @@ +package otel_test + +import ( + "testing" + "time" + + "github.com/orb-community/orb/agent/backend/otel" + "github.com/orb-community/orb/agent/policies" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "gopkg.in/yaml.v3" +) + +func TestSanitizePolicyData(t *testing.T) { + newPolicyData := policies.PolicyData{ + ID: "test-policy-id", + Name: "test-policy", + Backend: "otel", + Version: 0, + Format: "yaml", + State: policies.Running, + LastScrapeBytes: 0, + LastScrapeTS: time.Now(), + Data: map[string]interface{}{ + "receivers": map[string]interface{}{ + "httpcheck": map[string]interface{}{ + "collection_interval": "60s", + "targets": []interface{}{ + map[string]interface{}{ + "endpoint": "https://example.com", + "method": "GET", + "tags": map[string]string{ + "foo": "bar", + }, + }, + }, + }, + }, + "exporters": map[string]interface{}{}, + "service": map[string]interface{}{ + "pipelines": map[string]interface{}{ + "metrics": map[string]interface{}{ + "exporters": nil, + "receivers": []string{"httpcheck"}, + }, + }, + }, + }, + PreviousPolicyData: nil, + } + + policyYaml, err := yaml.Marshal(newPolicyData.Data) + require.NoError(t, err) + + copyPolicyData, err := otel.SanitizePolicyData(newPolicyData) + require.NoError(t, err) + + copyPolicyYaml, err := yaml.Marshal(copyPolicyData.Data) + require.NoError(t, err) + + assert.NotEqual(t, newPolicyData, copyPolicyData) + assert.NotEqual(t, string(policyYaml), string(copyPolicyYaml)) +} diff --git a/agent/otel/bridgeservice.go b/agent/otel/bridgeservice.go index b933a344f..cd8eb0c72 100644 --- a/agent/otel/bridgeservice.go +++ b/agent/otel/bridgeservice.go @@ -9,6 +9,7 @@ import ( type AgentBridgeService interface { RetrieveAgentInfoByPolicyName(policyName string) (*AgentDataPerPolicy, error) + RetrievePolicyByName(name string) (policies.PolicyData, error) NotifyAgentDisconnection(ctx context.Context, err error) } @@ -48,7 +49,11 @@ func (b *BridgeService) RetrieveAgentInfoByPolicyName(policyName string) (*Agent }, nil } -func (b *BridgeService) NotifyAgentDisconnection(ctx context.Context, err error) { +func (b *BridgeService) RetrievePolicyByName(name string) (policies.PolicyData, error) { + return b.policyRepo.GetByName(name) +} + +func (b *BridgeService) NotifyAgentDisconnection(ctx context.Context, _ error) { ctx.Done() b.cancelFunc() } diff --git a/agent/otel/otlpmqttexporter/collectorconfig.go b/agent/otel/otlpmqttexporter/collectorconfig.go new file mode 100644 index 000000000..c3b02b61b --- /dev/null +++ b/agent/otel/otlpmqttexporter/collectorconfig.go @@ -0,0 +1,23 @@ +package otlpmqttexporter + +import ( + "go.opentelemetry.io/collector/component" +) + +type CollectorConfig struct { + Receivers map[component.ID]component.Config `mapstructure:"receivers"` + Extensions map[string]interface{} `mapstructure:"extensions,omitempty"` + Exporters map[string]interface{} `mapstructure:"exporters,omitempty"` + Service map[string]interface{} `mapstructure:"service,omitempty"` +} + +type HTTPCheckReceiver struct { + CollectionInterval string `mapstructure:"collection_interval"` + Targets []HTTPCheckTarget `mapstructure:"targets"` +} + +type HTTPCheckTarget struct { + Endpoint string `mapstructure:"endpoint"` + Method string `mapstructure:"method"` + Tags map[string]string `mapstructure:"tags,omitempty"` +} diff --git a/agent/otel/otlpmqttexporter/collectorconfig_test.go b/agent/otel/otlpmqttexporter/collectorconfig_test.go new file mode 100644 index 000000000..9a3cfee1c --- /dev/null +++ b/agent/otel/otlpmqttexporter/collectorconfig_test.go @@ -0,0 +1,81 @@ +package otlpmqttexporter_test + +import ( + "encoding/json" + "testing" + + "github.com/mitchellh/mapstructure" + "github.com/orb-community/orb/agent/otel/otlpmqttexporter" + "github.com/orb-community/orb/agent/policies" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestExtractCollectorConfig(t *testing.T) { + policyJSON := `{ + "ID": "test-policy-id", + "Datasets": null, + "GroupIds": null, + "Name": "test-policy", + "Backend": "otel", + "Version": 0, + "Format": "yaml", + "Data": { + "exporters": {}, + "receivers": { + "httpcheck": { + "collection_interval": "60s", + "targets": [ + { + "endpoint": "https://example.com", + "method": "GET", + "tags": { + "foo": "bar" + } + } + ] + } + }, + "service": { + "pipelines": { + "metrics": { + "exporters": null, + "receivers": [ + "httpcheck" + ] + } + } + } + }, + "State": 1, + "BackendErr": "", + "LastScrapeBytes": 0, + "LastScrapeTS": "2023-12-18T13:57:42.024296Z", + "PreviousPolicyData": null +}` + var policy policies.PolicyData + if err := json.Unmarshal([]byte(policyJSON), &policy); err != nil { + t.Fatal(err) + } + + cfg, err := otlpmqttexporter.ExtractCollectorConfig(policy) + require.NoError(t, err) + assert.Equal(t, 1, len(cfg.Receivers)) + + for key, value := range cfg.Receivers { + switch key.Type() { + case "httpcheck": + var httpcheck otlpmqttexporter.HTTPCheckReceiver + if err := mapstructure.Decode(value, &httpcheck); err != nil { + t.Fatal(err) + } + assert.Equal(t, "60s", httpcheck.CollectionInterval) + assert.Equal(t, 1, len(httpcheck.Targets)) + for _, target := range httpcheck.Targets { + assert.Equal(t, "https://example.com", target.Endpoint) + assert.Equal(t, "GET", target.Method) + assert.Equal(t, map[string]string{"foo": "bar"}, target.Tags) + } + } + } +} diff --git a/agent/otel/otlpmqttexporter/factory.go b/agent/otel/otlpmqttexporter/factory.go index 57f609c78..79f563064 100644 --- a/agent/otel/otlpmqttexporter/factory.go +++ b/agent/otel/otlpmqttexporter/factory.go @@ -95,7 +95,7 @@ func CreateTracesExporter( set exporter.CreateSettings, cfg component.Config, ) (exporter.Traces, error) { - oce, err := newExporter(cfg, set, ctx) + oce, err := newExporter(cfg, set) if err != nil { return nil, err } @@ -119,7 +119,7 @@ func CreateMetricsExporter( set exporter.CreateSettings, cfg component.Config, ) (exporter.Metrics, error) { - oce, err := newExporter(cfg, set, ctx) + oce, err := newExporter(cfg, set) if err != nil { return nil, err } @@ -142,7 +142,7 @@ func CreateLogsExporter( set exporter.CreateSettings, cfg component.Config, ) (exporter.Logs, error) { - oce, err := newExporter(cfg, set, ctx) + oce, err := newExporter(cfg, set) if err != nil { return nil, err } diff --git a/agent/otel/otlpmqttexporter/otlp.go b/agent/otel/otlpmqttexporter/otlp.go index f2f0cb9f0..844aba621 100644 --- a/agent/otel/otlpmqttexporter/otlp.go +++ b/agent/otel/otlpmqttexporter/otlp.go @@ -15,9 +15,11 @@ import ( "github.com/andybalholm/brotli" mqtt "github.com/eclipse/paho.mqtt.golang" - "go.opentelemetry.io/collector/consumer/consumererror" - + "github.com/mitchellh/mapstructure" + "github.com/orb-community/orb/agent/policies" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/confmap" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/plog/plogotlp" @@ -53,7 +55,7 @@ func (e *baseExporter) compressBrotli(data []byte) []byte { } // Crete new exporter. -func newExporter(cfg component.Config, set exporter.CreateSettings, ctx context.Context) (*baseExporter, error) { +func newExporter(cfg component.Config, set exporter.CreateSettings) (*baseExporter, error) { oCfg := cfg.(*Config) if oCfg.Address != "" { _, err := url.Parse(oCfg.Address) @@ -110,24 +112,24 @@ func (e *baseExporter) injectScopeMetricsAttribute(metricsScope pmetric.ScopeMet switch metricItem.Type() { case pmetric.MetricTypeExponentialHistogram: - for i := 0; i < metricItem.ExponentialHistogram().DataPoints().Len(); i++ { - metricItem.ExponentialHistogram().DataPoints().At(i).Attributes().PutStr(attribute, value) + for j := 0; j < metricItem.ExponentialHistogram().DataPoints().Len(); j++ { + metricItem.ExponentialHistogram().DataPoints().At(j).Attributes().PutStr(attribute, value) } case pmetric.MetricTypeGauge: - for i := 0; i < metricItem.Gauge().DataPoints().Len(); i++ { - metricItem.Gauge().DataPoints().At(i).Attributes().PutStr(attribute, value) + for j := 0; j < metricItem.Gauge().DataPoints().Len(); j++ { + metricItem.Gauge().DataPoints().At(j).Attributes().PutStr(attribute, value) } case pmetric.MetricTypeHistogram: - for i := 0; i < metricItem.Histogram().DataPoints().Len(); i++ { - metricItem.Histogram().DataPoints().At(i).Attributes().PutStr(attribute, value) + for j := 0; j < metricItem.Histogram().DataPoints().Len(); j++ { + metricItem.Histogram().DataPoints().At(j).Attributes().PutStr(attribute, value) } case pmetric.MetricTypeSum: - for i := 0; i < metricItem.Sum().DataPoints().Len(); i++ { - metricItem.Sum().DataPoints().At(i).Attributes().PutStr(attribute, value) + for j := 0; j < metricItem.Sum().DataPoints().Len(); j++ { + metricItem.Sum().DataPoints().At(j).Attributes().PutStr(attribute, value) } case pmetric.MetricTypeSummary: - for i := 0; i < metricItem.Summary().DataPoints().Len(); i++ { - metricItem.Summary().DataPoints().At(i).Attributes().PutStr(attribute, value) + for j := 0; j < metricItem.Summary().DataPoints().Len(); j++ { + metricItem.Summary().DataPoints().At(j).Attributes().PutStr(attribute, value) } default: e.logger.Warn("not supported metric type", zap.String("name", metricItem.Name()), @@ -140,6 +142,52 @@ func (e *baseExporter) injectScopeMetricsAttribute(metricsScope pmetric.ScopeMet return metricsScope } +func (e *baseExporter) injectHTTPCheckLabels(metricsScope pmetric.ScopeMetrics, endpoint string, attribute string, value string) { + metrics := metricsScope.Metrics() + for i := 0; i < metrics.Len(); i++ { + metricItem := metrics.At(i) + + switch metricItem.Type() { + case pmetric.MetricTypeExponentialHistogram: + for j := 0; j < metricItem.ExponentialHistogram().DataPoints().Len(); j++ { + if v, ok := metricItem.ExponentialHistogram().DataPoints().At(j).Attributes().Get("http.url"); ok && v.AsString() == endpoint { + metricItem.ExponentialHistogram().DataPoints().At(j).Attributes().PutStr(attribute, value) + } + } + case pmetric.MetricTypeGauge: + for j := 0; j < metricItem.Gauge().DataPoints().Len(); j++ { + if v, ok := metricItem.Gauge().DataPoints().At(j).Attributes().Get("http.url"); ok && v.AsString() == endpoint { + metricItem.Gauge().DataPoints().At(j).Attributes().PutStr(attribute, value) + } + } + case pmetric.MetricTypeHistogram: + for j := 0; j < metricItem.Histogram().DataPoints().Len(); j++ { + if v, ok := metricItem.Histogram().DataPoints().At(j).Attributes().Get("http.url"); ok && v.AsString() == endpoint { + metricItem.Histogram().DataPoints().At(j).Attributes().PutStr(attribute, value) + } + } + case pmetric.MetricTypeSum: + for j := 0; j < metricItem.Sum().DataPoints().Len(); j++ { + if v, ok := metricItem.Sum().DataPoints().At(j).Attributes().Get("http.url"); ok && v.AsString() == endpoint { + metricItem.Sum().DataPoints().At(j).Attributes().PutStr(attribute, value) + } + } + case pmetric.MetricTypeSummary: + for j := 0; j < metricItem.Summary().DataPoints().Len(); j++ { + if v, ok := metricItem.Summary().DataPoints().At(j).Attributes().Get("http.url"); ok && v.AsString() == endpoint { + metricItem.Summary().DataPoints().At(j).Attributes().PutStr(attribute, value) + } + } + default: + e.logger.Warn("not supported metric type", zap.String("name", metricItem.Name()), + zap.String("type", metricItem.Type().String())) + metrics.RemoveIf(func(m pmetric.Metric) bool { + return m.Name() == metricItem.Name() + }) + } + } +} + // pushMetrics Exports metrics func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) error { tr := pmetricotlp.NewExportRequest() @@ -147,6 +195,7 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) erro scopes := pmetricotlp.NewExportRequestFromMetrics(md).Metrics().ResourceMetrics().At(0).ScopeMetrics() for i := 0; i < scopes.Len(); i++ { scope := scopes.At(i) + policyName, _ := scope.Scope().Attributes().Get("policy_name") policyNameStr := policyName.AsString() agentData, err := e.config.OrbAgentService.RetrieveAgentInfoByPolicyName(policyNameStr) @@ -155,6 +204,13 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) erro continue } + if scope.Scope().Name() == "otelcol/httpcheckreceiver" { + if err := e.enrichHTTPCheckMetrics(scope, policyNameStr); err != nil { + e.logger.Warn("failed to enrich httpcheck metrics", zap.Error(err)) + continue + } + } + // sort datasetIDs to send always on same order datasetIDs := strings.Split(agentData.Datasets, ",") sort.Strings(datasetIDs) @@ -302,9 +358,55 @@ func (e *baseExporter) export(ctx context.Context, topic string, request []byte) e.config.OrbAgentService.NotifyAgentDisconnection(ctx, token.Error()) return token.Error() } - e.logger.Debug("scraped and published telemetry", zap.String("topic", topic), + e.logger.Info("scraped and published telemetry", zap.String("topic", topic), zap.Int("payload_size_b", len(request)), zap.Int("compressed_payload_size_b", len(compressedPayload))) return nil } + +func (e *baseExporter) enrichHTTPCheckMetrics(scope pmetric.ScopeMetrics, policyName string) error { + policy, err := e.config.OrbAgentService.RetrievePolicyByName(policyName) + if err != nil { + e.logger.Warn("policy retrieval failed", zap.String("policyName", policyName), zap.Error(err)) + return err + } + + if policy.Backend == "otel" { + collectorConfig, err := ExtractCollectorConfig(policy) + if err != nil { + return err + } + + for key, receiver := range collectorConfig.Receivers { + switch key.Type() { + case "httpcheck": + var httpcheck HTTPCheckReceiver + if err := mapstructure.Decode(receiver, &httpcheck); err != nil { + return err + } + for _, target := range httpcheck.Targets { + for k, v := range target.Tags { + e.injectHTTPCheckLabels(scope, target.Endpoint, k, v) + } + } + } + } + } + + return nil +} + +func ExtractCollectorConfig(policyData policies.PolicyData) (*CollectorConfig, error) { + var config CollectorConfig + if policyData.Backend == "otel" { + configMap := confmap.NewFromStringMap(policyData.Data.(map[string]interface{})) + if err := configMap.Unmarshal(&config); err != nil { + return nil, err + } + + return &config, nil + } + + return &config, nil +} diff --git a/agent/policies/types.go b/agent/policies/types.go index 62565addb..d22aae4a5 100644 --- a/agent/policies/types.go +++ b/agent/policies/types.go @@ -17,6 +17,7 @@ type PolicyData struct { Name string Backend string Version int32 + Format string Data interface{} State PolicyState BackendErr string diff --git a/agent/policyMgr/manager.go b/agent/policyMgr/manager.go index fd1a2d72a..ac94c7b14 100644 --- a/agent/policyMgr/manager.go +++ b/agent/policyMgr/manager.go @@ -69,6 +69,7 @@ func (a *policyManager) ManagePolicy(payload fleet.AgentPolicyRPCPayload) { Name: payload.Name, Backend: payload.Backend, Version: payload.Version, + Format: payload.Format, Data: payload.Data, State: policies.Unknown, }