diff --git a/.github/workflows/e2e-tests.yml b/.github/workflows/e2e-tests.yml index e2e73d437bfa..4dea78961bca 100644 --- a/.github/workflows/e2e-tests.yml +++ b/.github/workflows/e2e-tests.yml @@ -12,7 +12,7 @@ env: # We limit cache download as a whole to 5 minutes. SEGMENT_DOWNLOAD_TIMEOUT_MINS: 2 jobs: - docker-build: + collector-build: runs-on: ubuntu-latest steps: - name: Checkout @@ -33,6 +33,40 @@ jobs: - name: Install dependencies if: steps.go-cache.outputs.cache-hit != 'true' run: make -j2 gomoddownload + - name: Build Collector + run: make otelcontribcol + - name: Upload Collector Binary + uses: actions/upload-artifact@v3 + with: + name: collector-binary + path: ./bin/* + docker-build: + runs-on: ubuntu-latest + needs: collector-build + steps: + - name: Checkout + uses: actions/checkout@v4 + - uses: actions/setup-go@v4 + with: + go-version: ~1.20.8 + cache: false + - name: Cache Go + id: go-cache + timeout-minutes: 5 + uses: actions/cache@v3 + with: + path: | + ~/go/bin + ~/go/pkg/mod + key: go-cache-${{ runner.os }}-${{ hashFiles('**/go.sum') }} + - name: Install dependencies + if: steps.go-cache.outputs.cache-hit != 'true' + run: make -j2 gomoddownload + - name: Download Collector Binary + uses: actions/download-artifact@v3 + with: + name: collector-binary + path: bin/ - name: Build Docker Image run: | make docker-otelcontribcol @@ -107,3 +141,35 @@ jobs: run: | cd receiver/k8sobjectsreceiver go test -v --tags=e2e + supervisor-test: + runs-on: ubuntu-latest + needs: collector-build + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v4 + with: + go-version: ~1.20.8 + cache: false + - name: Cache Go + id: go-cache + timeout-minutes: 5 + uses: actions/cache@v3 + with: + path: | + ~/go/bin + ~/go/pkg/mod + key: go-cache-${{ runner.os }}-${{ hashFiles('**/go.sum') }} + - name: Install dependencies + if: steps.go-cache.outputs.cache-hit != 'true' + run: make -j2 gomoddownload + - name: Download Collector Binary + uses: actions/download-artifact@v3 + with: + name: collector-binary + path: bin/ + - run: chmod +x bin/* + - name: Run opampsupervisor e2e tests + run: | + cd cmd/opampsupervisor + go test -v --tags=e2e + diff --git a/cmd/opampsupervisor/Makefile b/cmd/opampsupervisor/Makefile index ded7a36092dc..fea4c327e6cc 100644 --- a/cmd/opampsupervisor/Makefile +++ b/cmd/opampsupervisor/Makefile @@ -1 +1,5 @@ include ../../Makefile.Common + +e2e-test: + make -C ../../ otelcontribcol + go test -v --tags=e2e . diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go new file mode 100644 index 000000000000..a9be00a7e4d1 --- /dev/null +++ b/cmd/opampsupervisor/e2e_test.go @@ -0,0 +1,388 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +//go:build e2e +// +build e2e + +package main + +import ( + "bytes" + "context" + "crypto/sha256" + "io" + "log" + "net/http" + "net/http/httptest" + "os" + "path" + "runtime" + "strings" + "sync/atomic" + "testing" + "text/template" + "time" + + "github.com/open-telemetry/opamp-go/protobufs" + "github.com/open-telemetry/opamp-go/server" + "github.com/open-telemetry/opamp-go/server/types" + "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor/supervisor" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +type testLogger struct { + t *testing.T +} + +func (tl testLogger) Debugf(format string, args ...any) { + tl.t.Logf(format, args...) +} + +func (tl testLogger) Errorf(format string, args ...any) { + tl.t.Logf(format, args...) +} + +func defaultConnectingHandler(connectionCallbacks server.ConnectionCallbacksStruct) func(request *http.Request) types.ConnectionResponse { + return func(request *http.Request) types.ConnectionResponse { + return types.ConnectionResponse{ + Accept: true, + ConnectionCallbacks: connectionCallbacks, + } + } +} + +// onConnectingFuncFactory is a function that will be given to server.CallbacksStruct as +// OnConnectingFunc. This allows changing the ConnectionCallbacks both from the newOpAMPServer +// caller and inside of newOpAMP Server, and for custom implementations of the value for `Accept` +// in types.ConnectionResponse. +type onConnectingFuncFactory func(connectionCallbacks server.ConnectionCallbacksStruct) func(request *http.Request) types.ConnectionResponse + +type testingOpAMPServer struct { + addr string + supervisorConnected chan bool + sendToSupervisor func(*protobufs.ServerToAgent) +} + +func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks server.ConnectionCallbacksStruct) *testingOpAMPServer { + var agentConn atomic.Value + var isAgentConnected atomic.Bool + connectedChan := make(chan bool) + s := server.New(testLogger{t: t}) + onConnectedFunc := callbacks.OnConnectedFunc + callbacks.OnConnectedFunc = func(conn types.Connection) { + agentConn.Store(conn) + isAgentConnected.Store(true) + connectedChan <- true + if onConnectedFunc != nil { + onConnectedFunc(conn) + } + } + onConnectionCloseFunc := callbacks.OnConnectionCloseFunc + callbacks.OnConnectionCloseFunc = func(conn types.Connection) { + isAgentConnected.Store(false) + connectedChan <- false + if onConnectionCloseFunc != nil { + onConnectionCloseFunc(conn) + } + } + handler, _, err := s.Attach(server.Settings{ + Callbacks: server.CallbacksStruct{ + OnConnectingFunc: connectingCallback(callbacks), + }, + }) + require.NoError(t, err) + mux := http.NewServeMux() + mux.HandleFunc("/v1/opamp", handler) + httpSrv := httptest.NewServer(mux) + + shutdown := func() { + t.Log("Shutting down") + err := s.Stop(context.Background()) + assert.NoError(t, err) + httpSrv.Close() + } + send := func(msg *protobufs.ServerToAgent) { + if !isAgentConnected.Load() { + require.Fail(t, "Agent connection has not been established") + } + + agentConn.Load().(types.Connection).Send(context.Background(), msg) + } + t.Cleanup(func() { + waitForSupervisorConnection(connectedChan, false) + shutdown() + }) + return &testingOpAMPServer{ + addr: httpSrv.Listener.Addr().String(), + supervisorConnected: connectedChan, + sendToSupervisor: send, + } +} + +func newSupervisor(t *testing.T, configType string, extraConfigData map[string]string) *supervisor.Supervisor { + tpl, err := os.ReadFile(path.Join("testdata", "supervisor", "supervisor_"+configType+".yaml")) + require.NoError(t, err) + + templ, err := template.New("").Parse(string(tpl)) + require.NoError(t, err) + + var buf bytes.Buffer + var extension string + if runtime.GOOS == "windows" { + extension = ".exe" + } + configData := map[string]string{ + "goos": runtime.GOOS, + "goarch": runtime.GOARCH, + "extension": extension, + } + + for key, val := range extraConfigData { + configData[key] = val + } + err = templ.Execute(&buf, configData) + require.NoError(t, err) + cfgFile, _ := os.CreateTemp(t.TempDir(), "config_*.yaml") + _, err = cfgFile.Write(buf.Bytes()) + require.NoError(t, err) + + s, err := supervisor.NewSupervisor(zap.NewNop(), cfgFile.Name()) + require.NoError(t, err) + + return s +} + +func TestSupervisorStartsCollectorWithRemoteConfig(t *testing.T) { + var agentConfig atomic.Value + server := newOpAMPServer( + t, + defaultConnectingHandler, + server.ConnectionCallbacksStruct{ + OnMessageFunc: func(_ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + if message.EffectiveConfig != nil { + config := message.EffectiveConfig.ConfigMap.ConfigMap[""] + if config != nil { + agentConfig.Store(string(config.Body)) + } + } + + return &protobufs.ServerToAgent{} + }, + }) + + s := newSupervisor(t, "basic", map[string]string{"url": server.addr}) + defer s.Shutdown() + + waitForSupervisorConnection(server.supervisorConnected, true) + + cfg, hash, inputFile, outputFile := createSimplePipelineCollectorConf(t) + + server.sendToSupervisor(&protobufs.ServerToAgent{ + RemoteConfig: &protobufs.AgentRemoteConfig{ + Config: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + "": {Body: cfg.Bytes()}, + }, + }, + ConfigHash: hash, + }, + }) + + require.Eventually(t, func() bool { + cfg, ok := agentConfig.Load().(string) + if ok { + // The effective config may be structurally different compared to what was sent, + // so just check that it includes some strings we know to be unique to the remote config. + return strings.Contains(cfg, inputFile.Name()) && strings.Contains(cfg, outputFile.Name()) + } + + return false + }, 5*time.Second, 500*time.Millisecond, "Collector was not started with remote config") + + n, err := inputFile.WriteString("{\"body\":\"hello, world\"}\n") + require.NotZero(t, n, "Could not write to input file") + require.NoError(t, err) + + require.Eventually(t, func() bool { + logRecord := make([]byte, 1024) + n, _ := outputFile.Read(logRecord) + + return n != 0 + }, 10*time.Second, 500*time.Millisecond, "Log never appeared in output") +} + +func TestSupervisorRestartsCollectorAfterBadConfig(t *testing.T) { + var healthReport atomic.Value + var agentConfig atomic.Value + server := newOpAMPServer( + t, + defaultConnectingHandler, + server.ConnectionCallbacksStruct{ + OnMessageFunc: func(_ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + if message.Health != nil { + healthReport.Store(message.Health) + } + if message.EffectiveConfig != nil { + config := message.EffectiveConfig.ConfigMap.ConfigMap[""] + if config != nil { + agentConfig.Store(string(config.Body)) + } + } + + return &protobufs.ServerToAgent{} + }, + }) + + s := newSupervisor(t, "basic", map[string]string{"url": server.addr}) + defer s.Shutdown() + + waitForSupervisorConnection(server.supervisorConnected, true) + + cfg, hash := createBadCollectorConf(t) + + server.sendToSupervisor(&protobufs.ServerToAgent{ + RemoteConfig: &protobufs.AgentRemoteConfig{ + Config: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + "": {Body: cfg.Bytes()}, + }, + }, + ConfigHash: hash, + }, + }) + + require.Eventually(t, func() bool { + cfg, ok := agentConfig.Load().(string) + if ok { + // The effective config may be structurally different compared to what was sent, + // so just check that it includes some strings we know to be unique to the remote config. + return strings.Contains(cfg, "doesntexist") + } + + return false + }, 5*time.Second, 500*time.Millisecond, "Collector was not started with remote config") + + require.Eventually(t, func() bool { + health := healthReport.Load().(*protobufs.ComponentHealth) + + if health != nil { + return !health.Healthy && health.LastError != "" + } + + return false + }, 5*time.Second, 250*time.Millisecond, "Supervisor never reported that the Collector was unhealthy") + + cfg, hash, _, _ = createSimplePipelineCollectorConf(t) + + server.sendToSupervisor(&protobufs.ServerToAgent{ + RemoteConfig: &protobufs.AgentRemoteConfig{ + Config: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + "": {Body: cfg.Bytes()}, + }, + }, + ConfigHash: hash, + }, + }) + + require.Eventually(t, func() bool { + health := healthReport.Load().(*protobufs.ComponentHealth) + + if health != nil { + return health.Healthy && health.LastError == "" + } + + return false + }, 5*time.Second, 250*time.Millisecond, "Supervisor never reported that the Collector became healthy") +} + +func TestSupervisorConfiguresCapabilities(t *testing.T) { + var capabilities atomic.Uint64 + server := newOpAMPServer( + t, + defaultConnectingHandler, + server.ConnectionCallbacksStruct{ + OnMessageFunc: func(_ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + capabilities.Store(message.Capabilities) + + return &protobufs.ServerToAgent{} + }, + }) + + s := newSupervisor(t, "nocap", map[string]string{"url": server.addr}) + defer s.Shutdown() + + waitForSupervisorConnection(server.supervisorConnected, true) + + require.Eventually(t, func() bool { + cap := capabilities.Load() + + return cap == uint64(protobufs.AgentCapabilities_AgentCapabilities_ReportsStatus) + }, 5*time.Second, 250*time.Millisecond) +} + +// Creates a Collector config that reads and writes logs to files and provides +// file descriptors for I/O operations to those files. The files are placed +// in a unique temp directory that is cleaned up after the test's completion. +func createSimplePipelineCollectorConf(t *testing.T) (*bytes.Buffer, []byte, *os.File, *os.File) { + wd, err := os.Getwd() + require.NoError(t, err) + + // Create input and output files so we can "communicate" with a Collector binary. + // The testing package will automatically clean these up after each test. + tempDir := t.TempDir() + inputFile, err := os.CreateTemp(tempDir, "input_*.yaml") + require.NoError(t, err) + + outputFile, err := os.CreateTemp(tempDir, "output_*.yaml") + require.NoError(t, err) + + colCfgTpl, err := os.ReadFile(path.Join(wd, "testdata", "collector", "simple_pipeline.yaml")) + require.NoError(t, err) + + templ, err := template.New("").Parse(string(colCfgTpl)) + require.NoError(t, err) + + var confmapBuf bytes.Buffer + err = templ.Execute( + &confmapBuf, + map[string]string{ + "inputLogFile": inputFile.Name(), + "outputLogFile": outputFile.Name(), + }, + ) + require.NoError(t, err) + + h := sha256.New() + if _, err := io.Copy(h, bytes.NewBuffer(confmapBuf.Bytes())); err != nil { + log.Fatal(err) + } + + return &confmapBuf, h.Sum(nil), inputFile, outputFile +} + +func createBadCollectorConf(t *testing.T) (*bytes.Buffer, []byte) { + colCfg, err := os.ReadFile(path.Join("testdata", "collector", "bad_config.yaml")) + require.NoError(t, err) + + h := sha256.New() + if _, err := io.Copy(h, bytes.NewBuffer(colCfg)); err != nil { + log.Fatal(err) + } + + return bytes.NewBuffer(colCfg), h.Sum(nil) +} + +// Wait for the Supervisor to connect to or disconnect from the OpAMP server +func waitForSupervisorConnection(connection chan bool, connected bool) { + select { + case <-time.After(5 * time.Second): + break + case state := <-connection: + if state == connected { + break + } + } +} diff --git a/cmd/opampsupervisor/testdata/supervisor_darwin.yaml b/cmd/opampsupervisor/examples/supervisor_darwin.yaml similarity index 100% rename from cmd/opampsupervisor/testdata/supervisor_darwin.yaml rename to cmd/opampsupervisor/examples/supervisor_darwin.yaml diff --git a/cmd/opampsupervisor/testdata/supervisor_linux.yaml b/cmd/opampsupervisor/examples/supervisor_linux.yaml similarity index 100% rename from cmd/opampsupervisor/testdata/supervisor_linux.yaml rename to cmd/opampsupervisor/examples/supervisor_linux.yaml diff --git a/cmd/opampsupervisor/testdata/supervisor_windows.yaml b/cmd/opampsupervisor/examples/supervisor_windows.yaml similarity index 100% rename from cmd/opampsupervisor/testdata/supervisor_windows.yaml rename to cmd/opampsupervisor/examples/supervisor_windows.yaml diff --git a/cmd/opampsupervisor/go.mod b/cmd/opampsupervisor/go.mod index 17d5bceea48c..93ebc0eb675c 100644 --- a/cmd/opampsupervisor/go.mod +++ b/cmd/opampsupervisor/go.mod @@ -10,6 +10,7 @@ require ( github.com/knadh/koanf/v2 v2.0.1 github.com/oklog/ulid/v2 v2.1.0 github.com/open-telemetry/opamp-go v0.10.0 + github.com/stretchr/testify v1.8.4 go.opentelemetry.io/collector/config/configtls v0.91.0 go.uber.org/zap v1.26.0 ) @@ -23,7 +24,6 @@ require ( github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/stretchr/testify v1.8.4 // indirect go.opentelemetry.io/collector/config/configopaque v0.91.0 // indirect go.uber.org/multierr v1.10.0 // indirect golang.org/x/sys v0.14.0 // indirect diff --git a/cmd/opampsupervisor/testdata/collector/bad_config.yaml b/cmd/opampsupervisor/testdata/collector/bad_config.yaml new file mode 100644 index 000000000000..f9a17d198368 --- /dev/null +++ b/cmd/opampsupervisor/testdata/collector/bad_config.yaml @@ -0,0 +1,11 @@ +receivers: + doesntexist: + +exporters: + doesntexist: + +service: + pipelines: + traces: + receivers: [doesntexist] + exporters: [doesntexist] diff --git a/cmd/opampsupervisor/testdata/collector/simple_pipeline.yaml b/cmd/opampsupervisor/testdata/collector/simple_pipeline.yaml new file mode 100644 index 000000000000..e78c379f7ba6 --- /dev/null +++ b/cmd/opampsupervisor/testdata/collector/simple_pipeline.yaml @@ -0,0 +1,14 @@ +receivers: + filelog: + include: [{{.inputLogFile}}] + start_at: "beginning" + +exporters: + file: + path: {{.outputLogFile}} + +service: + pipelines: + logs: + receivers: [filelog] + exporters: [file] diff --git a/cmd/opampsupervisor/testdata/supervisor/supervisor_basic.yaml b/cmd/opampsupervisor/testdata/supervisor/supervisor_basic.yaml new file mode 100644 index 000000000000..b5ef4c5db720 --- /dev/null +++ b/cmd/opampsupervisor/testdata/supervisor/supervisor_basic.yaml @@ -0,0 +1,14 @@ +server: + endpoint: ws://{{.url}}/v1/opamp + tls: + insecure: true + +capabilities: + reports_effective_config: true + reports_own_metrics: true + reports_health: true + accepts_remote_config: true + reports_remote_config: true + +agent: + executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}} diff --git a/cmd/opampsupervisor/testdata/supervisor/supervisor_nocap.yaml b/cmd/opampsupervisor/testdata/supervisor/supervisor_nocap.yaml new file mode 100644 index 000000000000..5b61ad4b1b68 --- /dev/null +++ b/cmd/opampsupervisor/testdata/supervisor/supervisor_nocap.yaml @@ -0,0 +1,14 @@ +server: + endpoint: ws://{{.url}}/v1/opamp + tls: + insecure: true + +capabilities: + reports_effective_config: false + reports_own_metrics: false + reports_health: false + accepts_remote_config: false + reports_remote_config: false + +agent: + executable: ../../bin/otelcontribcol_{{.goos}}_{{.goarch}}{{.extension}}