From 8b4dc3f7b78eceeace1fdc2c5589044dcbf50701 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Wed, 24 Apr 2024 00:01:00 +0530 Subject: [PATCH] =?UTF-8?q?[cmd/opampsupervisor]=20Persist=20collector=20r?= =?UTF-8?q?emote=20config=20&=20telemetry=20set=E2=80=A6=20(#30807)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …tings **Description:** Persists the configs contained in the AgentRemoteConfig and ConnectionSettings messages to be used on startup before any messages have been received from the OpAMP server. If the capability corresponding to the message for the config is disabled, then the config is not merged into the Collector's effective configuration **Link to tracking Issue:** Part of https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21078 **Testing:** Added e2e test **Documentation:** --- .../supervisor-persists-remote-config.yaml | 27 ++++ cmd/opampsupervisor/e2e_test.go | 100 +++++++++++- cmd/opampsupervisor/go.mod | 2 +- .../supervisor/config/config.go | 6 + cmd/opampsupervisor/supervisor/supervisor.go | 142 ++++++++++++++---- .../supervisor/supervisor_test.go | 22 ++- .../testdata/collector/effective_config.yaml | 14 ++ .../supervisor/supervisor_persistence.yaml | 17 +++ 8 files changed, 296 insertions(+), 34 deletions(-) create mode 100644 .chloggen/supervisor-persists-remote-config.yaml create mode 100644 cmd/opampsupervisor/testdata/supervisor/supervisor_persistence.yaml diff --git a/.chloggen/supervisor-persists-remote-config.yaml b/.chloggen/supervisor-persists-remote-config.yaml new file mode 100644 index 000000000000..c49fe0203589 --- /dev/null +++ b/.chloggen/supervisor-persists-remote-config.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: cmd/opampsupervisor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Persist collector remote config & telemetry settings + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [21078] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index 0be9c6cfe532..3756bed93152 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -73,11 +73,13 @@ type testingOpAMPServer struct { addr string supervisorConnected chan bool sendToSupervisor func(*protobufs.ServerToAgent) + shutdown func() } func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks server.ConnectionCallbacksStruct) *testingOpAMPServer { var agentConn atomic.Value var isAgentConnected atomic.Bool + var didShutdown atomic.Bool connectedChan := make(chan bool) s := server.New(testLogger{t: t}) onConnectedFunc := callbacks.OnConnectedFunc @@ -108,10 +110,14 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca httpSrv := httptest.NewServer(mux) shutdown := func() { - t.Log("Shutting down") - err := s.Stop(context.Background()) - assert.NoError(t, err) - httpSrv.Close() + if !didShutdown.Load() { + waitForSupervisorConnection(connectedChan, false) + t.Log("Shutting down") + err := s.Stop(context.Background()) + assert.NoError(t, err) + httpSrv.Close() + } + didShutdown.Store(true) } send := func(msg *protobufs.ServerToAgent) { if !isAgentConnected.Load() { @@ -121,13 +127,13 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca agentConn.Load().(types.Connection).Send(context.Background(), msg) } t.Cleanup(func() { - waitForSupervisorConnection(connectedChan, false) shutdown() }) return &testingOpAMPServer{ addr: httpSrv.Listener.Addr().String(), supervisorConnected: connectedChan, sendToSupervisor: send, + shutdown: shutdown, } } @@ -600,3 +606,87 @@ func TestSupervisorOpAMPConnectionSettings(t *testing.T) { return connectedToNewServer.Load() == true }, 10*time.Second, 500*time.Millisecond, "Collector did not connect to new OpAMP server") } + +func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) { + // Create a temporary directory to store the test config file. + tempDir := t.TempDir() + + var agentConfig atomic.Value + initialServer := newOpAMPServer( + t, + defaultConnectingHandler, + server.ConnectionCallbacksStruct{ + OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + if message.EffectiveConfig != nil { + config := message.EffectiveConfig.ConfigMap.ConfigMap[""] + if config != nil { + agentConfig.Store(string(config.Body)) + } + } + return &protobufs.ServerToAgent{} + }, + }) + + s := newSupervisor(t, "persistence", map[string]string{"url": initialServer.addr, "storage_dir": tempDir}) + + waitForSupervisorConnection(initialServer.supervisorConnected, true) + + cfg, hash, _, _ := createSimplePipelineCollectorConf(t) + + initialServer.sendToSupervisor(&protobufs.ServerToAgent{ + RemoteConfig: &protobufs.AgentRemoteConfig{ + Config: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + "": {Body: cfg.Bytes()}, + }, + }, + ConfigHash: hash, + }, + }) + + require.Eventually(t, func() bool { + // Check if the config file was written to the storage directory + _, err := os.Stat(path.Join(tempDir, "last_recv_remote_config.dat")) + return err == nil + }, 5*time.Second, 250*time.Millisecond, "Config file was not written to persistent storage directory") + + agentConfig.Store("") + s.Shutdown() + initialServer.shutdown() + + newServer := newOpAMPServer( + t, + defaultConnectingHandler, + server.ConnectionCallbacksStruct{ + OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + if message.EffectiveConfig != nil { + config := message.EffectiveConfig.ConfigMap.ConfigMap[""] + if config != nil { + agentConfig.Store(string(config.Body)) + } + } + return &protobufs.ServerToAgent{} + }, + }) + defer newServer.shutdown() + + s1 := newSupervisor(t, "persistence", map[string]string{"url": newServer.addr, "storage_dir": tempDir}) + defer s1.Shutdown() + + waitForSupervisorConnection(newServer.supervisorConnected, true) + + newServer.sendToSupervisor(&protobufs.ServerToAgent{ + Flags: uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportFullState), + }) + + // Check that the new Supervisor instance starts with the configuration from the last received remote config + require.Eventually(t, func() bool { + loadedConfig, ok := agentConfig.Load().(string) + if !ok { + return false + } + + return strings.Contains(loadedConfig, "filelog") + }, 10*time.Second, 500*time.Millisecond, "Collector was not started with the last received remote config") + +} diff --git a/cmd/opampsupervisor/go.mod b/cmd/opampsupervisor/go.mod index 7da321ec7748..eb1ef11584bd 100644 --- a/cmd/opampsupervisor/go.mod +++ b/cmd/opampsupervisor/go.mod @@ -16,6 +16,7 @@ require ( go.opentelemetry.io/collector/semconv v0.99.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 + google.golang.org/protobuf v1.33.0 ) require ( @@ -31,6 +32,5 @@ require ( go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.23.0 // indirect golang.org/x/sys v0.18.0 // indirect - google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/cmd/opampsupervisor/supervisor/config/config.go b/cmd/opampsupervisor/supervisor/config/config.go index ff020f00db0c..fc353dca4a0a 100644 --- a/cmd/opampsupervisor/supervisor/config/config.go +++ b/cmd/opampsupervisor/supervisor/config/config.go @@ -14,6 +14,12 @@ type Supervisor struct { Server *OpAMPServer Agent *Agent Capabilities *Capabilities `mapstructure:"capabilities"` + Storage *Storage `mapstructure:"storage"` +} + +type Storage struct { + // Directory is the directory where the Supervisor will store its data. + Directory string `mapstructure:"directory"` } // Capabilities is the set of capabilities that the Supervisor supports. diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 1aa757a54290..339c05ecf2b2 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -13,6 +13,7 @@ import ( "net" "net/http" "os" + "path/filepath" "sort" "sync" "sync/atomic" @@ -34,6 +35,7 @@ import ( "go.opentelemetry.io/collector/config/configtls" semconv "go.opentelemetry.io/collector/semconv/v1.21.0" "go.uber.org/zap" + "google.golang.org/protobuf/proto" "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/commander" "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/config" @@ -49,6 +51,9 @@ var ( //go:embed templates/owntelemetry.yaml ownTelemetryTpl string + + lastRecvRemoteConfigFile = "last_recv_remote_config.dat" + lastRecvOwnMetricsConfigFile = "last_recv_own_metrics_config.dat" ) // Supervisor implements supervising of OpenTelemetry Collector and uses OpAMPClient @@ -141,13 +146,13 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { return nil, fmt.Errorf("could not get bootstrap info from the Collector: %w", err) } - port, err := s.findRandomPort() + healthCheckPort, err := s.findRandomPort() if err != nil { return nil, fmt.Errorf("could not find port for health check: %w", err) } - s.agentHealthCheckEndpoint = fmt.Sprintf("localhost:%d", port) + s.agentHealthCheckEndpoint = fmt.Sprintf("localhost:%d", healthCheckPort) logger.Debug("Supervisor starting", zap.String("id", s.instanceID.String())) @@ -159,7 +164,7 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { } if connErr := s.waitForOpAMPConnection(); connErr != nil { - return nil, fmt.Errorf("failed to connect to the OpAMP server: %w", err) + return nil, fmt.Errorf("failed to connect to the OpAMP server: %w", connErr) } s.commander, err = commander.NewCommander( @@ -547,9 +552,10 @@ func (s *Supervisor) composeExtraLocalConfig() []byte { } func (s *Supervisor) loadAgentEffectiveConfig() { - var effectiveConfigBytes []byte + var effectiveConfigBytes, effFromFile, lastRecvRemoteConfig, lastRecvOwnMetricsConfig []byte + var err error - effFromFile, err := os.ReadFile(s.effectiveConfigFilePath) + effFromFile, err = os.ReadFile(s.effectiveConfigFilePath) if err == nil { // We have an effective config file. effectiveConfigBytes = effFromFile @@ -559,6 +565,50 @@ func (s *Supervisor) loadAgentEffectiveConfig() { } s.effectiveConfig.Store(string(effectiveConfigBytes)) + + if s.config.Capabilities != nil && s.config.Capabilities.AcceptsRemoteConfig != nil && + *s.config.Capabilities.AcceptsRemoteConfig && + s.config.Storage != nil { + // Try to load the last received remote config if it exists. + lastRecvRemoteConfig, err = os.ReadFile(filepath.Join(s.config.Storage.Directory, lastRecvRemoteConfigFile)) + if err == nil { + config := &protobufs.AgentRemoteConfig{} + err = proto.Unmarshal(lastRecvRemoteConfig, config) + if err != nil { + s.logger.Error("Cannot parse last received remote config", zap.Error(err)) + } else { + s.remoteConfig = config + } + } else { + s.logger.Error("error while reading last received config", zap.Error(err)) + } + } else { + s.logger.Debug("Remote config is not supported, will not attempt to load config from fil") + } + + if s.config.Capabilities != nil && s.config.Capabilities.ReportsOwnMetrics != nil && + *s.config.Capabilities.ReportsOwnMetrics && + s.config.Storage != nil { + // Try to load the last received own metrics config if it exists. + lastRecvOwnMetricsConfig, err = os.ReadFile(filepath.Join(s.config.Storage.Directory, lastRecvOwnMetricsConfigFile)) + if err == nil { + set := &protobufs.TelemetryConnectionSettings{} + err = proto.Unmarshal(lastRecvOwnMetricsConfig, set) + if err != nil { + s.logger.Error("Cannot parse last received own metrics config", zap.Error(err)) + } else { + s.setupOwnMetrics(context.Background(), set) + } + } + } else { + s.logger.Debug("Own metrics is not supported, will not attempt to load config from file") + } + + _, err = s.recalcEffectiveConfig() + if err != nil { + s.logger.Error("Error composing effective config. Ignoring received config", zap.Error(err)) + return + } } // createEffectiveConfigMsg create an EffectiveConfig with the content of the @@ -624,6 +674,7 @@ func (s *Supervisor) setupOwnMetrics(_ context.Context, settings *protobufs.Tele // 2) the own metrics config section // 3) the local override config that is hard-coded in the Supervisor. func (s *Supervisor) composeEffectiveConfig(config *protobufs.AgentRemoteConfig) (configChanged bool, err error) { + var k = koanf.New(".") // Begin with empty config. We will merge received configs on top of it. @@ -631,32 +682,37 @@ func (s *Supervisor) composeEffectiveConfig(config *protobufs.AgentRemoteConfig) return false, err } - // Sort to make sure the order of merging is stable. - var names []string - for name := range config.Config.ConfigMap { - if name == "" { - // skip instance config - continue + if config != nil && config.Config != nil { + // Sort to make sure the order of merging is stable. + var names []string + for name := range config.Config.ConfigMap { + if name == "" { + // skip instance config + continue + } + names = append(names, name) } - names = append(names, name) - } - sort.Strings(names) + sort.Strings(names) - // Append instance config as the last item. - names = append(names, "") + // Append instance config as the last item. + names = append(names, "") - // Merge received configs. - for _, name := range names { - item := config.Config.ConfigMap[name] - var k2 = koanf.New(".") - err = k2.Load(rawbytes.Provider(item.Body), yaml.Parser()) - if err != nil { - return false, fmt.Errorf("cannot parse config named %s: %w", name, err) - } - err = k.Merge(k2) - if err != nil { - return false, fmt.Errorf("cannot merge config named %s: %w", name, err) + // Merge received configs. + for _, name := range names { + item := config.Config.ConfigMap[name] + if item == nil { + continue + } + var k2 = koanf.New(".") + err = k2.Load(rawbytes.Provider(item.Body), yaml.Parser()) + if err != nil { + return false, fmt.Errorf("cannot parse config named %s: %w", name, err) + } + err = k.Merge(k2) + if err != nil { + return false, fmt.Errorf("cannot merge config named %s: %w", name, err) + } } } @@ -910,9 +966,38 @@ func (s *Supervisor) Shutdown() { s.supervisorWG.Wait() } +func (s *Supervisor) saveLastReceivedConfig(config *protobufs.AgentRemoteConfig) error { + if s.config.Storage == nil { + return nil + } + + cfg, err := proto.Marshal(config) + if err != nil { + return err + } + + return os.WriteFile(filepath.Join(s.config.Storage.Directory, lastRecvRemoteConfigFile), cfg, 0600) +} + +func (s *Supervisor) saveLastReceivedOwnTelemetrySettings(set *protobufs.TelemetryConnectionSettings, filePath string) error { + if s.config.Storage == nil { + return nil + } + + cfg, err := proto.Marshal(set) + if err != nil { + return err + } + + return os.WriteFile(filepath.Join(s.config.Storage.Directory, filePath), cfg, 0600) +} + func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) { configChanged := false if msg.RemoteConfig != nil { + if err := s.saveLastReceivedConfig(msg.RemoteConfig); err != nil { + s.logger.Error("Could not save last received remote config", zap.Error(err)) + } s.remoteConfig = msg.RemoteConfig s.logger.Debug("Received remote config from server", zap.String("hash", fmt.Sprintf("%x", s.remoteConfig.ConfigHash))) @@ -939,6 +1024,9 @@ func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) { } if msg.OwnMetricsConnSettings != nil { + if err := s.saveLastReceivedOwnTelemetrySettings(msg.OwnMetricsConnSettings, lastRecvOwnMetricsConfigFile); err != nil { + s.logger.Error("Could not save last received own telemetry settings", zap.Error(err)) + } configChanged = s.setupOwnMetrics(ctx, msg.OwnMetricsConnSettings) || configChanged } diff --git a/cmd/opampsupervisor/supervisor/supervisor_test.go b/cmd/opampsupervisor/supervisor/supervisor_test.go index f247b169907a..2ac31db6c10e 100644 --- a/cmd/opampsupervisor/supervisor/supervisor_test.go +++ b/cmd/opampsupervisor/supervisor/supervisor_test.go @@ -12,11 +12,15 @@ import ( "github.com/open-telemetry/opamp-go/protobufs" "github.com/stretchr/testify/require" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/config" ) func Test_composeEffectiveConfig(t *testing.T) { + acceptsRemoteConfig := true s := Supervisor{ logger: zap.NewNop(), + config: config.Supervisor{Capabilities: &config.Capabilities{AcceptsRemoteConfig: &acceptsRemoteConfig}}, hasNewConfig: make(chan struct{}, 1), effectiveConfigFilePath: "effective.yaml", agentConfigOwnMetricsSection: &atomic.Value{}, @@ -37,6 +41,22 @@ func Test_composeEffectiveConfig(t *testing.T) { }, } + fileLogConfig := ` +receivers: + filelog: + include: ['/test/logs/input.log'] + start_at: "beginning" + +exporters: + file: + path: '/test/logs/output.log' + +service: + pipelines: + logs: + receivers: [filelog] + exporters: [file]` + require.NoError(t, s.createTemplates()) s.loadAgentEffectiveConfig() @@ -44,7 +64,7 @@ func Test_composeEffectiveConfig(t *testing.T) { Config: &protobufs.AgentConfigMap{ ConfigMap: map[string]*protobufs.AgentConfigFile{ "": { - Body: []byte(""), + Body: []byte(fileLogConfig), }, }, }, diff --git a/cmd/opampsupervisor/testdata/collector/effective_config.yaml b/cmd/opampsupervisor/testdata/collector/effective_config.yaml index 58d6940011b6..5573cc570593 100644 --- a/cmd/opampsupervisor/testdata/collector/effective_config.yaml +++ b/cmd/opampsupervisor/testdata/collector/effective_config.yaml @@ -1,9 +1,23 @@ +exporters: + file: + path: /test/logs/output.log extensions: health_check: endpoint: localhost:8000 +receivers: + filelog: + include: + - /test/logs/input.log + start_at: beginning service: extensions: - health_check + pipelines: + logs: + exporters: + - file + receivers: + - filelog telemetry: logs: encoding: json diff --git a/cmd/opampsupervisor/testdata/supervisor/supervisor_persistence.yaml b/cmd/opampsupervisor/testdata/supervisor/supervisor_persistence.yaml new file mode 100644 index 000000000000..7595f758f851 --- /dev/null +++ b/cmd/opampsupervisor/testdata/supervisor/supervisor_persistence.yaml @@ -0,0 +1,17 @@ +server: + endpoint: ws://{{.url}}/v1/opamp + tls: + insecure: true + +capabilities: + reports_effective_config: true + reports_own_metrics: true + reports_health: true + accepts_remote_config: true + reports_remote_config: true + +storage: + directory: {{.storage_dir}} + +agent: + executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}}