From ad8690749eb7b1b37a0c73c3cf40c41765598217 Mon Sep 17 00:00:00 2001 From: Evan Bradley <11745660+evan-bradley@users.noreply.github.com> Date: Fri, 12 Jan 2024 15:15:55 -0500 Subject: [PATCH] [cmd/opampsupervisor] Implement Collector bootstrapping (#29848) **Description:** Utilize the OpAMP extension to get identifying attributes from the Collector. A few things I want to call out: * I moved the Supervisor's various config fragments into separate files that are embedded into the binary. I think this makes them easier to edit. I can also move the changes for the existing fragments to a separate PR if it adds too much to the diff. * I opted to use the OTLP receiver instead of the filelog receiver because it is included in both existing upstream distributions and I expect it is slightly more common. Ideally we should look at other approaches to solve this. **Link to tracking Issue:** Resolves https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21071 **Testing:** Added an integration test. --------- Co-authored-by: Antoine Toulme Co-authored-by: Evan Bradley --- .chloggen/supervisor-bootstrapping.yaml | 27 ++ cmd/opampsupervisor/e2e_test.go | 92 +++++- cmd/opampsupervisor/go.mod | 1 + cmd/opampsupervisor/go.sum | 2 + cmd/opampsupervisor/specification/README.md | 4 +- cmd/opampsupervisor/supervisor/server.go | 45 +++ cmd/opampsupervisor/supervisor/supervisor.go | 293 ++++++++++++------ .../supervisor/supervisor_test.go | 58 ++++ .../supervisor/templates/bootstrap.yaml | 24 ++ .../supervisor/templates/extraconfig.yaml | 16 + .../supervisor/templates/owntelemetry.yaml | 21 ++ .../testdata/collector/effective_config.yaml | 11 + .../testdata/supervisor/supervisor_test.yaml | 14 + 13 files changed, 500 insertions(+), 108 deletions(-) create mode 100755 .chloggen/supervisor-bootstrapping.yaml create mode 100644 cmd/opampsupervisor/supervisor/server.go create mode 100644 cmd/opampsupervisor/supervisor/supervisor_test.go create mode 100644 cmd/opampsupervisor/supervisor/templates/bootstrap.yaml create mode 100644 cmd/opampsupervisor/supervisor/templates/extraconfig.yaml create mode 100644 cmd/opampsupervisor/supervisor/templates/owntelemetry.yaml create mode 100644 cmd/opampsupervisor/testdata/collector/effective_config.yaml create mode 100644 cmd/opampsupervisor/testdata/supervisor/supervisor_test.yaml diff --git a/.chloggen/supervisor-bootstrapping.yaml b/.chloggen/supervisor-bootstrapping.yaml new file mode 100755 index 000000000000..962ee4799413 --- /dev/null +++ b/.chloggen/supervisor-bootstrapping.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: cmd/opampsupervisor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Use a bootstrapping flow to get the Collector's agent description. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [21071] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index a9be00a7e4d1..a569ac747b65 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -15,6 +15,7 @@ import ( "net/http" "net/http/httptest" "os" + "os/exec" "path" "runtime" "strings" @@ -23,12 +24,18 @@ import ( "text/template" "time" + "github.com/knadh/koanf/parsers/yaml" + "github.com/knadh/koanf/providers/file" + "github.com/knadh/koanf/providers/rawbytes" + "github.com/knadh/koanf/v2" "github.com/open-telemetry/opamp-go/protobufs" "github.com/open-telemetry/opamp-go/server" "github.com/open-telemetry/opamp-go/server/types" "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor" + "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/config" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + semconv "go.opentelemetry.io/collector/semconv/v1.21.0" "go.uber.org/zap" ) @@ -122,6 +129,14 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca } func newSupervisor(t *testing.T, configType string, extraConfigData map[string]string) *supervisor.Supervisor { + cfgFile := getSupervisorConfig(t, configType, extraConfigData) + s, err := supervisor.NewSupervisor(zap.NewNop(), cfgFile.Name()) + require.NoError(t, err) + + return s +} + +func getSupervisorConfig(t *testing.T, configType string, extraConfigData map[string]string) *os.File { tpl, err := os.ReadFile(path.Join("testdata", "supervisor", "supervisor_"+configType+".yaml")) require.NoError(t, err) @@ -148,10 +163,7 @@ func newSupervisor(t *testing.T, configType string, extraConfigData map[string]s _, err = cfgFile.Write(buf.Bytes()) require.NoError(t, err) - s, err := supervisor.NewSupervisor(zap.NewNop(), cfgFile.Name()) - require.NoError(t, err) - - return s + return cfgFile } func TestSupervisorStartsCollectorWithRemoteConfig(t *testing.T) { @@ -323,6 +335,78 @@ func TestSupervisorConfiguresCapabilities(t *testing.T) { }, 5*time.Second, 250*time.Millisecond) } +func TestSupervisorBootstrapsCollector(t *testing.T) { + agentDescription := atomic.Value{} + + // Load the Supervisor config so we can get the location of + // the Collector that will be run. + var cfg config.Supervisor + cfgFile := getSupervisorConfig(t, "nocap", map[string]string{}) + k := koanf.New("::") + err := k.Load(file.Provider(cfgFile.Name()), yaml.Parser()) + require.NoError(t, err) + err = k.UnmarshalWithConf("", &cfg, koanf.UnmarshalConf{ + Tag: "mapstructure", + }) + require.NoError(t, err) + + // Get the binary name and version from the Collector binary + // using the `components` command that prints a YAML-encoded + // map of information about the Collector build. Some of this + // information will be used as defaults for the telemetry + // attributes. + agentPath := cfg.Agent.Executable + componentsInfo, err := exec.Command(agentPath, "components").Output() + require.NoError(t, err) + k = koanf.New("::") + err = k.Load(rawbytes.Provider(componentsInfo), yaml.Parser()) + require.NoError(t, err) + buildinfo := k.StringMap("buildinfo") + command := buildinfo["command"] + version := buildinfo["version"] + + server := newOpAMPServer( + t, + defaultConnectingHandler, + server.ConnectionCallbacksStruct{ + OnMessageFunc: func(_ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + if message.AgentDescription != nil { + agentDescription.Store(message.AgentDescription) + } + + return &protobufs.ServerToAgent{} + }, + }) + + s := newSupervisor(t, "nocap", map[string]string{"url": server.addr}) + defer s.Shutdown() + + waitForSupervisorConnection(server.supervisorConnected, true) + + require.Eventually(t, func() bool { + ad, ok := agentDescription.Load().(*protobufs.AgentDescription) + if !ok { + return false + } + + var agentName, agentVersion string + identAttr := ad.IdentifyingAttributes + for _, attr := range identAttr { + switch attr.Key { + case semconv.AttributeServiceName: + agentName = attr.Value.GetStringValue() + case semconv.AttributeServiceVersion: + agentVersion = attr.Value.GetStringValue() + } + } + + // By default the Collector should report its name and version + // from the component.BuildInfo struct built into the Collector + // binary. + return agentName == command && agentVersion == version + }, 5*time.Second, 250*time.Millisecond) +} + // Creates a Collector config that reads and writes logs to files and provides // file descriptors for I/O operations to those files. The files are placed // in a unique temp directory that is cleaned up after the test's completion. diff --git a/cmd/opampsupervisor/go.mod b/cmd/opampsupervisor/go.mod index 66c9b36ff5a0..ddb7d9477141 100644 --- a/cmd/opampsupervisor/go.mod +++ b/cmd/opampsupervisor/go.mod @@ -12,6 +12,7 @@ require ( github.com/open-telemetry/opamp-go v0.10.0 github.com/stretchr/testify v1.8.4 go.opentelemetry.io/collector/config/configtls v0.92.1-0.20240110091511-bf804d6c4ecc + go.opentelemetry.io/collector/semconv v0.92.1-0.20240110091511-bf804d6c4ecc go.uber.org/zap v1.26.0 ) diff --git a/cmd/opampsupervisor/go.sum b/cmd/opampsupervisor/go.sum index aae4b96dba47..2356f1901bcb 100644 --- a/cmd/opampsupervisor/go.sum +++ b/cmd/opampsupervisor/go.sum @@ -41,6 +41,8 @@ go.opentelemetry.io/collector/config/configopaque v0.92.1-0.20240110091511-bf804 go.opentelemetry.io/collector/config/configopaque v0.92.1-0.20240110091511-bf804d6c4ecc/go.mod h1:dQK8eUXjIGKaw1RB7UIg2nqx56AueNxeKFCdB0P1ypg= go.opentelemetry.io/collector/config/configtls v0.92.1-0.20240110091511-bf804d6c4ecc h1:Md5yyTFQq2uvrVcJuRXQCtIEdJL7VyDxGanWJbzUIDA= go.opentelemetry.io/collector/config/configtls v0.92.1-0.20240110091511-bf804d6c4ecc/go.mod h1:rL9BH5Hyrkni4t+QOx/opuwD0CHq/ZIFTsh6QLLsbmA= +go.opentelemetry.io/collector/semconv v0.92.1-0.20240110091511-bf804d6c4ecc h1:cfooQkd6CO0Q8HLeTFEuYvcN8UjjPsuRJtxmp5OZBA8= +go.opentelemetry.io/collector/semconv v0.92.1-0.20240110091511-bf804d6c4ecc/go.mod h1:gZ0uzkXsN+J5NpiRcdp9xOhNGQDDui8Y62p15sKrlzo= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= diff --git a/cmd/opampsupervisor/specification/README.md b/cmd/opampsupervisor/specification/README.md index 909ca86f58e5..e72656b0a9ae 100644 --- a/cmd/opampsupervisor/specification/README.md +++ b/cmd/opampsupervisor/specification/README.md @@ -220,8 +220,8 @@ configuration. To overcome this problem the Supervisor starts the Collector with an "noop" configuration that collects nothing but allows the opamp extension to be started. The "noop" configuration is a single pipeline -with a filelog receiver that points to a non-existing file and a logging -exporter and the opamp extension. The purpose of the "noop" +with an OTLP receiver that listens on a random port and a debug +exporter, and the opamp extension. The purpose of the "noop" configuration is to make sure the Collector starts and the opamp extension communicates with the Supervisor. diff --git a/cmd/opampsupervisor/supervisor/server.go b/cmd/opampsupervisor/supervisor/server.go new file mode 100644 index 000000000000..efc7ba5e154d --- /dev/null +++ b/cmd/opampsupervisor/supervisor/server.go @@ -0,0 +1,45 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package supervisor + +import ( + "net/http" + + "github.com/open-telemetry/opamp-go/protobufs" + "github.com/open-telemetry/opamp-go/server" + serverTypes "github.com/open-telemetry/opamp-go/server/types" +) + +type flattenedSettings struct { + onMessageFunc func(conn serverTypes.Connection, message *protobufs.AgentToServer) + onConnectingFunc func(request *http.Request) + endpoint string +} + +func newServerSettings(fs flattenedSettings) server.StartSettings { + return server.StartSettings{ + Settings: server.Settings{ + Callbacks: server.CallbacksStruct{ + OnConnectingFunc: func(request *http.Request) serverTypes.ConnectionResponse { + if fs.onConnectingFunc != nil { + fs.onConnectingFunc(request) + } + return serverTypes.ConnectionResponse{ + Accept: true, + ConnectionCallbacks: server.ConnectionCallbacksStruct{ + OnMessageFunc: func(conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + if fs.onMessageFunc != nil { + fs.onMessageFunc(conn, message) + } + + return &protobufs.ServerToAgent{} + }, + }, + } + }, + }, + }, + ListenEndpoint: fs.endpoint, + } +} diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 3e8355040c4d..83a565f07a9f 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -4,15 +4,18 @@ package supervisor import ( + "bytes" "context" + _ "embed" "errors" "fmt" "math/rand" "net" + "net/http" "os" - "runtime" "sort" "sync/atomic" + "text/template" "time" "github.com/cenkalti/backoff/v4" @@ -24,6 +27,9 @@ import ( "github.com/open-telemetry/opamp-go/client" "github.com/open-telemetry/opamp-go/client/types" "github.com/open-telemetry/opamp-go/protobufs" + "github.com/open-telemetry/opamp-go/server" + serverTypes "github.com/open-telemetry/opamp-go/server/types" + semconv "go.opentelemetry.io/collector/semconv/v1.21.0" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/commander" @@ -31,8 +37,16 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/healthchecker" ) -// This Supervisor is developed specifically for the OpenTelemetry Collector. -const agentType = "io.opentelemetry.collector" +var ( + //go:embed templates/bootstrap.yaml + bootstrapConfTpl string + + //go:embed templates/extraconfig.yaml + extraConfigTpl string + + //go:embed templates/owntelemetry.yaml + ownTelemetryTpl string +) // Supervisor implements supervising of OpenTelemetry Collector and uses OpAMPClient // to work with an OpAMP Server. @@ -51,11 +65,14 @@ type Supervisor struct { // Supervisor's own config. config config.Supervisor + agentDescription *protobufs.AgentDescription + // Agent's instance id. instanceID ulid.ULID - // The version of the agent. - agentVersion string + bootstrapTemplate *template.Template + extraConfigTemplate *template.Template + ownTelemetryTemplate *template.Template // A config section to be added to the Collector's config to fetch its own metrics. // TODO: store this persistently so that when starting we can compose the effective @@ -97,32 +114,35 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { effectiveConfig: &atomic.Value{}, } + if err := s.createTemplates(); err != nil { + return nil, err + } + if err := s.loadConfig(configFile); err != nil { return nil, fmt.Errorf("error loading config: %w", err) } - if err := s.getBootstrapInfo(); err != nil { - s.logger.Error("Couldn't get agent version", zap.Error(err)) + id, err := s.createInstanceID() + if err != nil { + return nil, err } - port, err := s.findRandomPort() + s.instanceID = id - if err != nil { - return nil, fmt.Errorf("could not find port for health check: %w", err) + if err = s.getBootstrapInfo(); err != nil { + return nil, fmt.Errorf("could not get bootstrap info from the Collector: %w", err) } - s.agentHealthCheckEndpoint = fmt.Sprintf("localhost:%d", port) - - id, err := s.createInstanceID() + port, err := s.findRandomPort() if err != nil { - return nil, err + return nil, fmt.Errorf("could not find port for health check: %w", err) } - s.instanceID = id + s.agentHealthCheckEndpoint = fmt.Sprintf("localhost:%d", port) logger.Debug("Supervisor starting", - zap.String("id", s.instanceID.String()), zap.String("type", agentType), zap.String("version", s.agentVersion)) + zap.String("id", s.instanceID.String())) s.loadAgentEffectiveConfig() @@ -145,6 +165,22 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { return s, nil } +func (s *Supervisor) createTemplates() error { + var err error + + if s.bootstrapTemplate, err = template.New("bootstrap").Parse(bootstrapConfTpl); err != nil { + return err + } + if s.extraConfigTemplate, err = template.New("extraconfig").Parse(extraConfigTpl); err != nil { + return err + } + if s.ownTelemetryTemplate, err = template.New("owntelemetry").Parse(ownTelemetryTpl); err != nil { + return err + } + + return nil +} + func (s *Supervisor) loadConfig(configFile string) error { if configFile == "" { return errors.New("path to config file cannot be empty") @@ -166,10 +202,109 @@ func (s *Supervisor) loadConfig(configFile string) error { return nil } -// TODO: Implement bootstrapping https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21071 -// nolint: unparam func (s *Supervisor) getBootstrapInfo() (err error) { - s.agentVersion = "1.0.0" + port, err := s.findRandomPort() + if err != nil { + return err + } + + supervisorPort, err := s.findRandomPort() + if err != nil { + return err + } + + var cfg bytes.Buffer + + err = s.bootstrapTemplate.Execute(&cfg, map[string]any{ + "EndpointPort": port, + "InstanceUid": s.instanceID.String(), + "SupervisorPort": supervisorPort, + }) + if err != nil { + return err + } + + s.writeEffectiveConfigToFile(cfg.String(), s.effectiveConfigFilePath) + + srv := server.New(s.logger.Sugar()) + + done := make(chan error, 1) + var connected atomic.Bool + + err = srv.Start(newServerSettings(flattenedSettings{ + endpoint: fmt.Sprintf("localhost:%d", supervisorPort), + onConnectingFunc: func(request *http.Request) { + connected.Store(true) + + }, + onMessageFunc: func(_ serverTypes.Connection, message *protobufs.AgentToServer) { + if message.AgentDescription != nil { + instanceIDSeen := false + s.agentDescription = message.AgentDescription + identAttr := s.agentDescription.IdentifyingAttributes + + for _, attr := range identAttr { + if attr.Key == semconv.AttributeServiceInstanceID { + // TODO: Consider whether to attempt restarting the Collector. + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/29864 + if attr.Value.GetStringValue() != s.instanceID.String() { + done <- fmt.Errorf( + "the Collector's instance ID (%s) does not match with the instance ID set by the Supervisor (%s)", + attr.Value.GetStringValue(), + s.instanceID.String()) + return + } + instanceIDSeen = true + } + } + + if !instanceIDSeen { + done <- errors.New("the Collector did not specify an instance ID in its AgentDescription message") + return + } + + done <- nil + } + }, + })) + if err != nil { + return err + } + + cmd, err := commander.NewCommander( + s.logger, + s.config.Agent, + "--config", s.effectiveConfigFilePath, + ) + if err != nil { + return err + } + + if err = cmd.Start(context.Background()); err != nil { + return err + } + + select { + // TODO make timeout configurable + case <-time.After(3 * time.Second): + if connected.Load() { + return errors.New("collector connected but never responded with an AgentDescription message") + } else { + return errors.New("collector's OpAMP client never connected to the Supervisor") + } + case err = <-done: + if err != nil { + return err + } + } + + if err = cmd.Stop(context.Background()); err != nil { + return err + } + + if err = srv.Stop(context.Background()); err != nil { + return err + } return nil } @@ -252,7 +387,7 @@ func (s *Supervisor) startOpAMP() error { }, Capabilities: s.Capabilities(), } - err = s.opampClient.SetAgentDescription(s.createAgentDescription()) + err = s.opampClient.SetAgentDescription(s.agentDescription) if err != nil { return err } @@ -287,57 +422,29 @@ func (s *Supervisor) createInstanceID() (ulid.ULID, error) { } -func keyVal(key, val string) *protobufs.KeyValue { - return &protobufs.KeyValue{ - Key: key, - Value: &protobufs.AnyValue{ - Value: &protobufs.AnyValue_StringValue{StringValue: val}, - }, +func (s *Supervisor) composeExtraLocalConfig() []byte { + var cfg bytes.Buffer + resourceAttrs := map[string]string{} + for _, attr := range s.agentDescription.IdentifyingAttributes { + resourceAttrs[attr.Key] = attr.Value.GetStringValue() } -} - -func (s *Supervisor) createAgentDescription() *protobufs.AgentDescription { - hostname, _ := os.Hostname() - - return &protobufs.AgentDescription{ - IdentifyingAttributes: []*protobufs.KeyValue{ - keyVal("service.name", agentType), - keyVal("service.version", s.agentVersion), - keyVal("service.instance.id", s.instanceID.String()), - }, - NonIdentifyingAttributes: []*protobufs.KeyValue{ - keyVal("os.family", runtime.GOOS), - keyVal("host.name", hostname), - }, + for _, attr := range s.agentDescription.NonIdentifyingAttributes { + resourceAttrs[attr.Key] = attr.Value.GetStringValue() } -} - -func (s *Supervisor) composeExtraLocalConfig() string { - return fmt.Sprintf(` -service: - telemetry: - logs: - # Enables JSON log output for the Agent. - encoding: json - resource: - # Set resource attributes required by OpAMP spec. - # See https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#agentdescriptionidentifying_attributes - service.name: %s - service.version: %s - service.instance.id: %s - - # Enable extension to allow the Supervisor to check health. - extensions: [health_check] - -extensions: - health_check: - endpoint: %s -`, - agentType, - s.agentVersion, - s.instanceID.String(), - s.agentHealthCheckEndpoint, + tplVars := map[string]any{ + "Healthcheck": s.agentHealthCheckEndpoint, + "ResourceAttributes": resourceAttrs, + } + err := s.extraConfigTemplate.Execute( + &cfg, + tplVars, ) + if err != nil { + s.logger.Error("Could not compose local config", zap.Error(err)) + return nil + } + + return cfg.Bytes() } func (s *Supervisor) loadAgentEffectiveConfig() { @@ -349,7 +456,7 @@ func (s *Supervisor) loadAgentEffectiveConfig() { effectiveConfigBytes = effFromFile } else { // No effective config file, just use the initial config. - effectiveConfigBytes = []byte(s.composeExtraLocalConfig()) + effectiveConfigBytes = s.composeExtraLocalConfig() } s.effectiveConfig.Store(string(effectiveConfigBytes)) @@ -375,11 +482,10 @@ func (s *Supervisor) createEffectiveConfigMsg() *protobufs.EffectiveConfig { } func (s *Supervisor) setupOwnMetrics(_ context.Context, settings *protobufs.TelemetryConnectionSettings) (configChanged bool) { - var cfg string + var cfg bytes.Buffer if settings.DestinationEndpoint == "" { // No destination. Disable metric collection. s.logger.Debug("Disabling own metrics pipeline in the config") - cfg = "" } else { s.logger.Debug("Enabling own metrics pipeline in the config") @@ -390,37 +496,20 @@ func (s *Supervisor) setupOwnMetrics(_ context.Context, settings *protobufs.Tele return } - cfg = fmt.Sprintf( - ` -receivers: - # Collect own metrics - prometheus/own_metrics: - config: - scrape_configs: - - job_name: 'otel-collector' - scrape_interval: 10s - static_configs: - - targets: ['0.0.0.0:%d'] -exporters: - otlphttp/own_metrics: - metrics_endpoint: %s - -service: - telemetry: - metrics: - address: :%d - pipelines: - metrics/own_metrics: - receivers: [prometheus/own_metrics] - exporters: [otlphttp/own_metrics] -`, - port, - settings.DestinationEndpoint, - port, + err = s.ownTelemetryTemplate.Execute( + &cfg, + map[string]any{ + "PrometheusPort": port, + "MetricsEndpoint": settings.DestinationEndpoint, + }, ) - } + if err != nil { + s.logger.Error("Could not setup own metrics", zap.Error(err)) + return + } - s.agentConfigOwnMetricsSection.Store(cfg) + } + s.agentConfigOwnMetricsSection.Store(cfg.String()) // Need to recalculate the Agent config so that the metric config is included in it. configChanged, err := s.recalcEffectiveConfig() @@ -481,7 +570,7 @@ func (s *Supervisor) composeEffectiveConfig(config *protobufs.AgentRemoteConfig) } // Merge local config last since it has the highest precedence. - if err = k.Load(rawbytes.Provider([]byte(s.composeExtraLocalConfig())), yaml.Parser()); err != nil { + if err = k.Load(rawbytes.Provider(s.composeExtraLocalConfig()), yaml.Parser()); err != nil { return false, err } @@ -737,7 +826,7 @@ func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) { zap.String("old_id", s.instanceID.String()), zap.String("new_id", newInstanceID.String())) s.instanceID = newInstanceID - err = s.opampClient.SetAgentDescription(s.createAgentDescription()) + err = s.opampClient.SetAgentDescription(s.agentDescription) if err != nil { s.logger.Error("Failed to send agent description to OpAMP server") } diff --git a/cmd/opampsupervisor/supervisor/supervisor_test.go b/cmd/opampsupervisor/supervisor/supervisor_test.go new file mode 100644 index 000000000000..2e7cafab11af --- /dev/null +++ b/cmd/opampsupervisor/supervisor/supervisor_test.go @@ -0,0 +1,58 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package supervisor + +import ( + "os" + "sync/atomic" + "testing" + + "github.com/open-telemetry/opamp-go/protobufs" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func Test_composeEffectiveConfig(t *testing.T) { + s := Supervisor{ + logger: zap.NewNop(), + hasNewConfig: make(chan struct{}, 1), + effectiveConfigFilePath: "effective.yaml", + agentConfigOwnMetricsSection: &atomic.Value{}, + effectiveConfig: &atomic.Value{}, + agentHealthCheckEndpoint: "localhost:8000", + } + + s.agentDescription = &protobufs.AgentDescription{ + IdentifyingAttributes: []*protobufs.KeyValue{ + { + Key: "service.name", + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_StringValue{ + StringValue: "otelcol", + }, + }, + }, + }, + } + + require.NoError(t, s.createTemplates()) + s.loadAgentEffectiveConfig() + + configChanged, err := s.composeEffectiveConfig(&protobufs.AgentRemoteConfig{ + Config: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + "": { + Body: []byte(""), + }, + }, + }, + }) + require.NoError(t, err) + + expectedConfig, err := os.ReadFile("../testdata/collector/effective_config.yaml") + require.NoError(t, err) + + require.True(t, configChanged) + require.Equal(t, string(expectedConfig), s.effectiveConfig.Load().(string)) +} diff --git a/cmd/opampsupervisor/supervisor/templates/bootstrap.yaml b/cmd/opampsupervisor/supervisor/templates/bootstrap.yaml new file mode 100644 index 000000000000..983b62c4af59 --- /dev/null +++ b/cmd/opampsupervisor/supervisor/templates/bootstrap.yaml @@ -0,0 +1,24 @@ +receivers: + otlp: + protocols: + http: + endpoint: "localhost:{{.EndpointPort}}" +exporters: + debug: + verbosity: basic + +extensions: + opamp: + instance_uid: "{{.InstanceUid}}" + server: + ws: + endpoint: "ws://localhost:{{.SupervisorPort}}/v1/opamp" + tls: + insecure: true + +service: + pipelines: + traces: + receivers: [otlp] + exporters: [debug] + extensions: [opamp] diff --git a/cmd/opampsupervisor/supervisor/templates/extraconfig.yaml b/cmd/opampsupervisor/supervisor/templates/extraconfig.yaml new file mode 100644 index 000000000000..910403f3ba44 --- /dev/null +++ b/cmd/opampsupervisor/supervisor/templates/extraconfig.yaml @@ -0,0 +1,16 @@ +service: + telemetry: + logs: + # Enables JSON log output for the Agent. + encoding: json + resource: + # Set resource attributes suggested by the OpAMP spec. + # See https://github.com/open-telemetry/opamp-spec/blob/main/specification.md#agentdescription-message + {{range $k, $v := .ResourceAttributes}}{{$k}}: "{{$v}}" + {{end}} + # Enable extension to allow the Supervisor to check health. + extensions: [health_check] + +extensions: + health_check: + endpoint: "{{.Healthcheck}}" diff --git a/cmd/opampsupervisor/supervisor/templates/owntelemetry.yaml b/cmd/opampsupervisor/supervisor/templates/owntelemetry.yaml new file mode 100644 index 000000000000..f46851298780 --- /dev/null +++ b/cmd/opampsupervisor/supervisor/templates/owntelemetry.yaml @@ -0,0 +1,21 @@ +receivers: + # Collect own metrics + prometheus/own_metrics: + config: + scrape_configs: + - job_name: 'otel-collector' + scrape_interval: 10s + static_configs: + - targets: ['0.0.0.0:{{.PrometheusPort}}'] +exporters: + otlphttp/own_metrics: + metrics_endpoint: "{{.MetricsEndpoint}}" + +service: + telemetry: + metrics: + address: ":{{.PrometheusPort}}" + pipelines: + metrics/own_metrics: + receivers: [prometheus/own_metrics] + exporters: [otlphttp/own_metrics] diff --git a/cmd/opampsupervisor/testdata/collector/effective_config.yaml b/cmd/opampsupervisor/testdata/collector/effective_config.yaml new file mode 100644 index 000000000000..58d6940011b6 --- /dev/null +++ b/cmd/opampsupervisor/testdata/collector/effective_config.yaml @@ -0,0 +1,11 @@ +extensions: + health_check: + endpoint: localhost:8000 +service: + extensions: + - health_check + telemetry: + logs: + encoding: json + resource: + service.name: otelcol diff --git a/cmd/opampsupervisor/testdata/supervisor/supervisor_test.yaml b/cmd/opampsupervisor/testdata/supervisor/supervisor_test.yaml new file mode 100644 index 000000000000..bdcdc2e72c93 --- /dev/null +++ b/cmd/opampsupervisor/testdata/supervisor/supervisor_test.yaml @@ -0,0 +1,14 @@ +server: + endpoint: wss://127.0.0.1:4320/v1/opamp + tls: + insecure_skip_verify: true + +capabilities: + reports_effective_config: true + reports_own_metrics: true + reports_health: true + accepts_remote_config: true + reports_remote_config: true + +agent: + executable: ../../bin/otelcontribcol_darwin_arm64