diff --git a/.chloggen/supervisor-persists-remote-config.yaml b/.chloggen/supervisor-persists-remote-config.yaml new file mode 100644 index 000000000000..c49fe0203589 --- /dev/null +++ b/.chloggen/supervisor-persists-remote-config.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: cmd/opampsupervisor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Persist collector remote config & telemetry settings + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [21078] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index 0be9c6cfe532..3756bed93152 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -73,11 +73,13 @@ type testingOpAMPServer struct { addr string supervisorConnected chan bool sendToSupervisor func(*protobufs.ServerToAgent) + shutdown func() } func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks server.ConnectionCallbacksStruct) *testingOpAMPServer { var agentConn atomic.Value var isAgentConnected atomic.Bool + var didShutdown atomic.Bool connectedChan := make(chan bool) s := server.New(testLogger{t: t}) onConnectedFunc := callbacks.OnConnectedFunc @@ -108,10 +110,14 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca httpSrv := httptest.NewServer(mux) shutdown := func() { - t.Log("Shutting down") - err := s.Stop(context.Background()) - assert.NoError(t, err) - httpSrv.Close() + if !didShutdown.Load() { + waitForSupervisorConnection(connectedChan, false) + t.Log("Shutting down") + err := s.Stop(context.Background()) + assert.NoError(t, err) + httpSrv.Close() + } + didShutdown.Store(true) } send := func(msg *protobufs.ServerToAgent) { if !isAgentConnected.Load() { @@ -121,13 +127,13 @@ func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, ca agentConn.Load().(types.Connection).Send(context.Background(), msg) } t.Cleanup(func() { - waitForSupervisorConnection(connectedChan, false) shutdown() }) return &testingOpAMPServer{ addr: httpSrv.Listener.Addr().String(), supervisorConnected: connectedChan, sendToSupervisor: send, + shutdown: shutdown, } } @@ -600,3 +606,87 @@ func TestSupervisorOpAMPConnectionSettings(t *testing.T) { return connectedToNewServer.Load() == true }, 10*time.Second, 500*time.Millisecond, "Collector did not connect to new OpAMP server") } + +func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) { + // Create a temporary directory to store the test config file. + tempDir := t.TempDir() + + var agentConfig atomic.Value + initialServer := newOpAMPServer( + t, + defaultConnectingHandler, + server.ConnectionCallbacksStruct{ + OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + if message.EffectiveConfig != nil { + config := message.EffectiveConfig.ConfigMap.ConfigMap[""] + if config != nil { + agentConfig.Store(string(config.Body)) + } + } + return &protobufs.ServerToAgent{} + }, + }) + + s := newSupervisor(t, "persistence", map[string]string{"url": initialServer.addr, "storage_dir": tempDir}) + + waitForSupervisorConnection(initialServer.supervisorConnected, true) + + cfg, hash, _, _ := createSimplePipelineCollectorConf(t) + + initialServer.sendToSupervisor(&protobufs.ServerToAgent{ + RemoteConfig: &protobufs.AgentRemoteConfig{ + Config: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + "": {Body: cfg.Bytes()}, + }, + }, + ConfigHash: hash, + }, + }) + + require.Eventually(t, func() bool { + // Check if the config file was written to the storage directory + _, err := os.Stat(path.Join(tempDir, "last_recv_remote_config.dat")) + return err == nil + }, 5*time.Second, 250*time.Millisecond, "Config file was not written to persistent storage directory") + + agentConfig.Store("") + s.Shutdown() + initialServer.shutdown() + + newServer := newOpAMPServer( + t, + defaultConnectingHandler, + server.ConnectionCallbacksStruct{ + OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + if message.EffectiveConfig != nil { + config := message.EffectiveConfig.ConfigMap.ConfigMap[""] + if config != nil { + agentConfig.Store(string(config.Body)) + } + } + return &protobufs.ServerToAgent{} + }, + }) + defer newServer.shutdown() + + s1 := newSupervisor(t, "persistence", map[string]string{"url": newServer.addr, "storage_dir": tempDir}) + defer s1.Shutdown() + + waitForSupervisorConnection(newServer.supervisorConnected, true) + + newServer.sendToSupervisor(&protobufs.ServerToAgent{ + Flags: uint64(protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportFullState), + }) + + // Check that the new Supervisor instance starts with the configuration from the last received remote config + require.Eventually(t, func() bool { + loadedConfig, ok := agentConfig.Load().(string) + if !ok { + return false + } + + return strings.Contains(loadedConfig, "filelog") + }, 10*time.Second, 500*time.Millisecond, "Collector was not started with the last received remote config") + +} diff --git a/cmd/opampsupervisor/go.mod b/cmd/opampsupervisor/go.mod index 7da321ec7748..eb1ef11584bd 100644 --- a/cmd/opampsupervisor/go.mod +++ b/cmd/opampsupervisor/go.mod @@ -16,6 +16,7 @@ require ( go.opentelemetry.io/collector/semconv v0.99.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 + google.golang.org/protobuf v1.33.0 ) require ( @@ -31,6 +32,5 @@ require ( go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.23.0 // indirect golang.org/x/sys v0.18.0 // indirect - google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/cmd/opampsupervisor/supervisor/config/config.go b/cmd/opampsupervisor/supervisor/config/config.go index ff020f00db0c..fc353dca4a0a 100644 --- a/cmd/opampsupervisor/supervisor/config/config.go +++ b/cmd/opampsupervisor/supervisor/config/config.go @@ -14,6 +14,12 @@ type Supervisor struct { Server *OpAMPServer Agent *Agent Capabilities *Capabilities `mapstructure:"capabilities"` + Storage *Storage `mapstructure:"storage"` +} + +type Storage struct { + // Directory is the directory where the Supervisor will store its data. + Directory string `mapstructure:"directory"` } // Capabilities is the set of capabilities that the Supervisor supports. diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 1aa757a54290..339c05ecf2b2 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -13,6 +13,7 @@ import ( "net" "net/http" "os" + "path/filepath" "sort" "sync" "sync/atomic" @@ -34,6 +35,7 @@ import ( "go.opentelemetry.io/collector/config/configtls" semconv "go.opentelemetry.io/collector/semconv/v1.21.0" "go.uber.org/zap" + "google.golang.org/protobuf/proto" "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/commander" "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/config" @@ -49,6 +51,9 @@ var ( //go:embed templates/owntelemetry.yaml ownTelemetryTpl string + + lastRecvRemoteConfigFile = "last_recv_remote_config.dat" + lastRecvOwnMetricsConfigFile = "last_recv_own_metrics_config.dat" ) // Supervisor implements supervising of OpenTelemetry Collector and uses OpAMPClient @@ -141,13 +146,13 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { return nil, fmt.Errorf("could not get bootstrap info from the Collector: %w", err) } - port, err := s.findRandomPort() + healthCheckPort, err := s.findRandomPort() if err != nil { return nil, fmt.Errorf("could not find port for health check: %w", err) } - s.agentHealthCheckEndpoint = fmt.Sprintf("localhost:%d", port) + s.agentHealthCheckEndpoint = fmt.Sprintf("localhost:%d", healthCheckPort) logger.Debug("Supervisor starting", zap.String("id", s.instanceID.String())) @@ -159,7 +164,7 @@ func NewSupervisor(logger *zap.Logger, configFile string) (*Supervisor, error) { } if connErr := s.waitForOpAMPConnection(); connErr != nil { - return nil, fmt.Errorf("failed to connect to the OpAMP server: %w", err) + return nil, fmt.Errorf("failed to connect to the OpAMP server: %w", connErr) } s.commander, err = commander.NewCommander( @@ -547,9 +552,10 @@ func (s *Supervisor) composeExtraLocalConfig() []byte { } func (s *Supervisor) loadAgentEffectiveConfig() { - var effectiveConfigBytes []byte + var effectiveConfigBytes, effFromFile, lastRecvRemoteConfig, lastRecvOwnMetricsConfig []byte + var err error - effFromFile, err := os.ReadFile(s.effectiveConfigFilePath) + effFromFile, err = os.ReadFile(s.effectiveConfigFilePath) if err == nil { // We have an effective config file. effectiveConfigBytes = effFromFile @@ -559,6 +565,50 @@ func (s *Supervisor) loadAgentEffectiveConfig() { } s.effectiveConfig.Store(string(effectiveConfigBytes)) + + if s.config.Capabilities != nil && s.config.Capabilities.AcceptsRemoteConfig != nil && + *s.config.Capabilities.AcceptsRemoteConfig && + s.config.Storage != nil { + // Try to load the last received remote config if it exists. + lastRecvRemoteConfig, err = os.ReadFile(filepath.Join(s.config.Storage.Directory, lastRecvRemoteConfigFile)) + if err == nil { + config := &protobufs.AgentRemoteConfig{} + err = proto.Unmarshal(lastRecvRemoteConfig, config) + if err != nil { + s.logger.Error("Cannot parse last received remote config", zap.Error(err)) + } else { + s.remoteConfig = config + } + } else { + s.logger.Error("error while reading last received config", zap.Error(err)) + } + } else { + s.logger.Debug("Remote config is not supported, will not attempt to load config from fil") + } + + if s.config.Capabilities != nil && s.config.Capabilities.ReportsOwnMetrics != nil && + *s.config.Capabilities.ReportsOwnMetrics && + s.config.Storage != nil { + // Try to load the last received own metrics config if it exists. + lastRecvOwnMetricsConfig, err = os.ReadFile(filepath.Join(s.config.Storage.Directory, lastRecvOwnMetricsConfigFile)) + if err == nil { + set := &protobufs.TelemetryConnectionSettings{} + err = proto.Unmarshal(lastRecvOwnMetricsConfig, set) + if err != nil { + s.logger.Error("Cannot parse last received own metrics config", zap.Error(err)) + } else { + s.setupOwnMetrics(context.Background(), set) + } + } + } else { + s.logger.Debug("Own metrics is not supported, will not attempt to load config from file") + } + + _, err = s.recalcEffectiveConfig() + if err != nil { + s.logger.Error("Error composing effective config. Ignoring received config", zap.Error(err)) + return + } } // createEffectiveConfigMsg create an EffectiveConfig with the content of the @@ -624,6 +674,7 @@ func (s *Supervisor) setupOwnMetrics(_ context.Context, settings *protobufs.Tele // 2) the own metrics config section // 3) the local override config that is hard-coded in the Supervisor. func (s *Supervisor) composeEffectiveConfig(config *protobufs.AgentRemoteConfig) (configChanged bool, err error) { + var k = koanf.New(".") // Begin with empty config. We will merge received configs on top of it. @@ -631,32 +682,37 @@ func (s *Supervisor) composeEffectiveConfig(config *protobufs.AgentRemoteConfig) return false, err } - // Sort to make sure the order of merging is stable. - var names []string - for name := range config.Config.ConfigMap { - if name == "" { - // skip instance config - continue + if config != nil && config.Config != nil { + // Sort to make sure the order of merging is stable. + var names []string + for name := range config.Config.ConfigMap { + if name == "" { + // skip instance config + continue + } + names = append(names, name) } - names = append(names, name) - } - sort.Strings(names) + sort.Strings(names) - // Append instance config as the last item. - names = append(names, "") + // Append instance config as the last item. + names = append(names, "") - // Merge received configs. - for _, name := range names { - item := config.Config.ConfigMap[name] - var k2 = koanf.New(".") - err = k2.Load(rawbytes.Provider(item.Body), yaml.Parser()) - if err != nil { - return false, fmt.Errorf("cannot parse config named %s: %w", name, err) - } - err = k.Merge(k2) - if err != nil { - return false, fmt.Errorf("cannot merge config named %s: %w", name, err) + // Merge received configs. + for _, name := range names { + item := config.Config.ConfigMap[name] + if item == nil { + continue + } + var k2 = koanf.New(".") + err = k2.Load(rawbytes.Provider(item.Body), yaml.Parser()) + if err != nil { + return false, fmt.Errorf("cannot parse config named %s: %w", name, err) + } + err = k.Merge(k2) + if err != nil { + return false, fmt.Errorf("cannot merge config named %s: %w", name, err) + } } } @@ -910,9 +966,38 @@ func (s *Supervisor) Shutdown() { s.supervisorWG.Wait() } +func (s *Supervisor) saveLastReceivedConfig(config *protobufs.AgentRemoteConfig) error { + if s.config.Storage == nil { + return nil + } + + cfg, err := proto.Marshal(config) + if err != nil { + return err + } + + return os.WriteFile(filepath.Join(s.config.Storage.Directory, lastRecvRemoteConfigFile), cfg, 0600) +} + +func (s *Supervisor) saveLastReceivedOwnTelemetrySettings(set *protobufs.TelemetryConnectionSettings, filePath string) error { + if s.config.Storage == nil { + return nil + } + + cfg, err := proto.Marshal(set) + if err != nil { + return err + } + + return os.WriteFile(filepath.Join(s.config.Storage.Directory, filePath), cfg, 0600) +} + func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) { configChanged := false if msg.RemoteConfig != nil { + if err := s.saveLastReceivedConfig(msg.RemoteConfig); err != nil { + s.logger.Error("Could not save last received remote config", zap.Error(err)) + } s.remoteConfig = msg.RemoteConfig s.logger.Debug("Received remote config from server", zap.String("hash", fmt.Sprintf("%x", s.remoteConfig.ConfigHash))) @@ -939,6 +1024,9 @@ func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) { } if msg.OwnMetricsConnSettings != nil { + if err := s.saveLastReceivedOwnTelemetrySettings(msg.OwnMetricsConnSettings, lastRecvOwnMetricsConfigFile); err != nil { + s.logger.Error("Could not save last received own telemetry settings", zap.Error(err)) + } configChanged = s.setupOwnMetrics(ctx, msg.OwnMetricsConnSettings) || configChanged } diff --git a/cmd/opampsupervisor/supervisor/supervisor_test.go b/cmd/opampsupervisor/supervisor/supervisor_test.go index f247b169907a..2ac31db6c10e 100644 --- a/cmd/opampsupervisor/supervisor/supervisor_test.go +++ b/cmd/opampsupervisor/supervisor/supervisor_test.go @@ -12,11 +12,15 @@ import ( "github.com/open-telemetry/opamp-go/protobufs" "github.com/stretchr/testify/require" "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor/config" ) func Test_composeEffectiveConfig(t *testing.T) { + acceptsRemoteConfig := true s := Supervisor{ logger: zap.NewNop(), + config: config.Supervisor{Capabilities: &config.Capabilities{AcceptsRemoteConfig: &acceptsRemoteConfig}}, hasNewConfig: make(chan struct{}, 1), effectiveConfigFilePath: "effective.yaml", agentConfigOwnMetricsSection: &atomic.Value{}, @@ -37,6 +41,22 @@ func Test_composeEffectiveConfig(t *testing.T) { }, } + fileLogConfig := ` +receivers: + filelog: + include: ['/test/logs/input.log'] + start_at: "beginning" + +exporters: + file: + path: '/test/logs/output.log' + +service: + pipelines: + logs: + receivers: [filelog] + exporters: [file]` + require.NoError(t, s.createTemplates()) s.loadAgentEffectiveConfig() @@ -44,7 +64,7 @@ func Test_composeEffectiveConfig(t *testing.T) { Config: &protobufs.AgentConfigMap{ ConfigMap: map[string]*protobufs.AgentConfigFile{ "": { - Body: []byte(""), + Body: []byte(fileLogConfig), }, }, }, diff --git a/cmd/opampsupervisor/testdata/collector/effective_config.yaml b/cmd/opampsupervisor/testdata/collector/effective_config.yaml index 58d6940011b6..5573cc570593 100644 --- a/cmd/opampsupervisor/testdata/collector/effective_config.yaml +++ b/cmd/opampsupervisor/testdata/collector/effective_config.yaml @@ -1,9 +1,23 @@ +exporters: + file: + path: /test/logs/output.log extensions: health_check: endpoint: localhost:8000 +receivers: + filelog: + include: + - /test/logs/input.log + start_at: beginning service: extensions: - health_check + pipelines: + logs: + exporters: + - file + receivers: + - filelog telemetry: logs: encoding: json diff --git a/cmd/opampsupervisor/testdata/supervisor/supervisor_persistence.yaml b/cmd/opampsupervisor/testdata/supervisor/supervisor_persistence.yaml new file mode 100644 index 000000000000..7595f758f851 --- /dev/null +++ b/cmd/opampsupervisor/testdata/supervisor/supervisor_persistence.yaml @@ -0,0 +1,17 @@ +server: + endpoint: ws://{{.url}}/v1/opamp + tls: + insecure: true + +capabilities: + reports_effective_config: true + reports_own_metrics: true + reports_health: true + accepts_remote_config: true + reports_remote_config: true + +storage: + directory: {{.storage_dir}} + +agent: + executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}}