diff --git a/.editorconfig b/.editorconfig index d41df6c789..3fb11b7577 100644 --- a/.editorconfig +++ b/.editorconfig @@ -28,8 +28,7 @@ indent_size = 4 indent_style = space trim_trailing_whitespace = false - -[{*.yaml,*.yml}] +[{*.yaml,*.yml, otelcol.tmpl}] indent_size = 2 indent_style = space trim_trailing_whitespace = true diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2c904fa009..4a96c94e8d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -299,7 +299,7 @@ jobs: benchmark-data-dir-path: "" # Set auto-push to false since GitHub API token is not given auto-push: false - alert-threshold: '125%' + alert-threshold: '175%' gh-pages-branch: "benchmark-results" fail-on-alert: true diff --git a/Makefile b/Makefile index 7e25e16abc..0a372f8cd3 100644 --- a/Makefile +++ b/Makefile @@ -130,7 +130,7 @@ format: ## Format code unit-test: $(TEST_BUILD_DIR) ## Run unit tests @CGO_ENABLED=0 $(GOTEST) -count=1 -coverprofile=$(TEST_BUILD_DIR)/tmp_coverage.out -coverpkg=./... -covermode count ./internal/... ./api/... ./cmd/... ./pkg/... - @cat $(TEST_BUILD_DIR)/tmp_coverage.out | grep -v ".pb.go" | grep -v ".gen.go" | grep -v ".pb.validate.go" | grep -v "fake_" | grep -v "github.com/nginx/agent/v3/test/" > $(TEST_BUILD_DIR)/coverage.out + @cat $(TEST_BUILD_DIR)/tmp_coverage.out | grep -v ".pb.go" | grep -v ".gen.go" | grep -v ".pb.validate.go" | grep -v "fake_" | grep -v "_utils.go" | grep -v "github.com/nginx/agent/v3/test/" > $(TEST_BUILD_DIR)/coverage.out @rm $(TEST_BUILD_DIR)/tmp_coverage.out @$(GOTOOL) cover -html=$(TEST_BUILD_DIR)/coverage.out -o $(TEST_BUILD_DIR)/coverage.html @printf "\nTotal code coverage: " && $(GOTOOL) cover -func=$(TEST_BUILD_DIR)/coverage.out | grep 'total:' | awk '{print $$3}' diff --git a/api/grpc/mpi/v1/command.pb.go b/api/grpc/mpi/v1/command.pb.go index 48d7a22374..e390eb5ac1 100644 --- a/api/grpc/mpi/v1/command.pb.go +++ b/api/grpc/mpi/v1/command.pb.go @@ -8,7 +8,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.35.1 +// protoc-gen-go v1.35.2 // protoc (unknown) // source: mpi/v1/command.proto diff --git a/api/grpc/mpi/v1/common.pb.go b/api/grpc/mpi/v1/common.pb.go index 3d71214d35..8f28d16ed7 100644 --- a/api/grpc/mpi/v1/common.pb.go +++ b/api/grpc/mpi/v1/common.pb.go @@ -5,7 +5,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.35.1 +// protoc-gen-go v1.35.2 // protoc (unknown) // source: mpi/v1/common.proto diff --git a/api/grpc/mpi/v1/files.pb.go b/api/grpc/mpi/v1/files.pb.go index faefbf7310..b13dce4293 100644 --- a/api/grpc/mpi/v1/files.pb.go +++ b/api/grpc/mpi/v1/files.pb.go @@ -5,7 +5,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.35.1 +// protoc-gen-go v1.35.2 // protoc (unknown) // source: mpi/v1/files.proto diff --git a/go.mod b/go.mod index b6ed31ecf3..e19b6d469a 100644 --- a/go.mod +++ b/go.mod @@ -81,6 +81,7 @@ require ( go.opentelemetry.io/otel v1.28.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 + golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e golang.org/x/mod v0.21.0 golang.org/x/sync v0.8.0 google.golang.org/protobuf v1.34.2 @@ -297,7 +298,6 @@ require ( go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/arch v0.7.0 // indirect - golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e // indirect golang.org/x/oauth2 v0.22.0 // indirect golang.org/x/term v0.23.0 // indirect golang.org/x/time v0.6.0 // indirect diff --git a/internal/bus/fake_message_pipe.go b/internal/bus/busfakes/fake_message_pipe.go similarity index 67% rename from internal/bus/fake_message_pipe.go rename to internal/bus/busfakes/fake_message_pipe.go index e7de9a5362..d0c697b30e 100644 --- a/internal/bus/fake_message_pipe.go +++ b/internal/bus/busfakes/fake_message_pipe.go @@ -3,22 +3,24 @@ // This source code is licensed under the Apache License, Version 2.0 license found in the // LICENSE file in the root directory of this source tree. -package bus +package busfakes import ( "context" "sync" + + "github.com/nginx/agent/v3/internal/bus" ) // FakeMessagePipe is a mock message pipe type FakeMessagePipe struct { - plugins []Plugin - messages []*Message - processedMessages []*Message + plugins []bus.Plugin + messages []*bus.Message + processedMessages []*bus.Message messagesLock sync.Mutex } -var _ MessagePipeInterface = &FakeMessagePipe{} +var _ bus.MessagePipeInterface = &FakeMessagePipe{} func NewFakeMessagePipe() *FakeMessagePipe { return &FakeMessagePipe{ @@ -26,32 +28,45 @@ func NewFakeMessagePipe() *FakeMessagePipe { } } -func (p *FakeMessagePipe) Register(size int, plugins []Plugin) error { +func (p *FakeMessagePipe) Register(size int, plugins []bus.Plugin) error { p.plugins = append(p.plugins, plugins...) return nil } func (p *FakeMessagePipe) DeRegister(ctx context.Context, pluginNames []string) error { - var plugins []Plugin + var plugins []bus.Plugin plugins = p.findPlugins(pluginNames, plugins) for _, plugin := range plugins { - index := getIndex(plugin.Info().Name, p.plugins) + index := p.GetIndex(plugin.Info().Name, p.plugins) p.unsubscribePlugin(ctx, index, plugin) } return nil } -func (p *FakeMessagePipe) unsubscribePlugin(ctx context.Context, index int, plugin Plugin) { +func (p *FakeMessagePipe) GetIndex(pluginName string, plugins []bus.Plugin) int { + for index, plugin := range plugins { + if pluginName == plugin.Info().Name { + return index + } + } + + return -1 +} + +func (p *FakeMessagePipe) unsubscribePlugin(ctx context.Context, index int, plugin bus.Plugin) { if index != -1 { p.plugins = append(p.plugins[:index], p.plugins[index+1:]...) - plugin.Close(ctx) + err := plugin.Close(ctx) + if err != nil { + return + } } } -func (p *FakeMessagePipe) findPlugins(pluginNames []string, plugins []Plugin) []Plugin { +func (p *FakeMessagePipe) findPlugins(pluginNames []string, plugins []bus.Plugin) []bus.Plugin { for _, name := range pluginNames { for _, plugin := range p.plugins { if plugin.Info().Name == name { @@ -63,21 +78,21 @@ func (p *FakeMessagePipe) findPlugins(pluginNames []string, plugins []Plugin) [] return plugins } -func (p *FakeMessagePipe) Process(ctx context.Context, msgs ...*Message) { +func (p *FakeMessagePipe) Process(_ context.Context, msgs ...*bus.Message) { p.messagesLock.Lock() defer p.messagesLock.Unlock() p.messages = append(p.messages, msgs...) } -func (p *FakeMessagePipe) GetMessages() []*Message { +func (p *FakeMessagePipe) GetMessages() []*bus.Message { p.messagesLock.Lock() defer p.messagesLock.Unlock() return p.messages } -func (p *FakeMessagePipe) GetProcessedMessages() []*Message { +func (p *FakeMessagePipe) GetProcessedMessages() []*bus.Message { return p.processedMessages } @@ -85,8 +100,8 @@ func (p *FakeMessagePipe) ClearMessages() { p.messagesLock.Lock() defer p.messagesLock.Unlock() - p.processedMessages = []*Message{} - p.messages = []*Message{} + p.processedMessages = []*bus.Message{} + p.messages = []*bus.Message{} } func (p *FakeMessagePipe) Run(ctx context.Context) { @@ -101,7 +116,7 @@ func (p *FakeMessagePipe) Run(ctx context.Context) { } func (p *FakeMessagePipe) RunWithoutInit(ctx context.Context) { - var message *Message + var message *bus.Message for len(p.messages) > 0 { message, p.messages = p.messages[0], p.messages[1:] @@ -112,7 +127,7 @@ func (p *FakeMessagePipe) RunWithoutInit(ctx context.Context) { } } -func (p *FakeMessagePipe) GetPlugins() []Plugin { +func (p *FakeMessagePipe) GetPlugins() []bus.Plugin { return p.plugins } diff --git a/internal/bus/message_pipe.go b/internal/bus/message_pipe.go index 6c3784ec7e..483b39456d 100644 --- a/internal/bus/message_pipe.go +++ b/internal/bus/message_pipe.go @@ -93,7 +93,7 @@ func (p *MessagePipe) DeRegister(ctx context.Context, pluginNames []string) erro plugins := p.findPlugins(pluginNames) for _, plugin := range plugins { - index := getIndex(plugin.Info().Name, p.plugins) + index := p.GetIndex(plugin.Info().Name, p.plugins) err := p.unsubscribePlugin(ctx, index, plugin) if err != nil { @@ -151,12 +151,15 @@ func (p *MessagePipe) unsubscribePlugin(ctx context.Context, index int, plugin P if index != -1 { p.plugins = append(p.plugins[:index], p.plugins[index+1:]...) - plugin.Close(ctx) + err := plugin.Close(ctx) + if err != nil { + return err + } for _, subscription := range plugin.Subscriptions() { - err := p.bus.Unsubscribe(subscription, plugin.Process) - if err != nil { - return err + unsubErr := p.bus.Unsubscribe(subscription, plugin.Process) + if unsubErr != nil { + return unsubErr } } } @@ -165,7 +168,7 @@ func (p *MessagePipe) unsubscribePlugin(ctx context.Context, index int, plugin P } func (p *MessagePipe) findPlugins(pluginNames []string) []Plugin { - plugins := []Plugin{} + var plugins []Plugin for _, name := range pluginNames { for _, plugin := range p.plugins { @@ -178,7 +181,7 @@ func (p *MessagePipe) findPlugins(pluginNames []string) []Plugin { return plugins } -func getIndex(pluginName string, plugins []Plugin) int { +func (p *MessagePipe) GetIndex(pluginName string, plugins []Plugin) int { for index, plugin := range plugins { if pluginName == plugin.Info().Name { return index diff --git a/internal/collector/factories.go b/internal/collector/factories.go index df3a431f64..12257cb34e 100644 --- a/internal/collector/factories.go +++ b/internal/collector/factories.go @@ -89,7 +89,7 @@ func OTelComponentFactories() (otelcol.Factories, error) { } func createConnectorFactories() (map[component.Type]connector.Factory, error) { - connectorsList := []connector.Factory{} + var connectorsList []connector.Factory return connector.MakeFactoryMap(connectorsList...) } diff --git a/internal/collector/factories_test.go b/internal/collector/factories_test.go index 5cefda51eb..5818e6e775 100644 --- a/internal/collector/factories_test.go +++ b/internal/collector/factories_test.go @@ -7,11 +7,12 @@ package collector import ( "testing" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/stretchr/testify/assert" ) -func TestOTelComponentFactories(t *testing.T) { +func TestOTelComponentFactoriesDefault(t *testing.T) { factories, err := OTelComponentFactories() require.NoError(t, err, "OTelComponentFactories should not return an error") diff --git a/internal/collector/otel_collector_plugin.go b/internal/collector/otel_collector_plugin.go index d2879dbfbc..4e3f1a85f0 100644 --- a/internal/collector/otel_collector_plugin.go +++ b/internal/collector/otel_collector_plugin.go @@ -17,6 +17,7 @@ import ( "github.com/nginx/agent/v3/api/grpc/mpi/v1" "github.com/nginx/agent/v3/internal/backoff" "github.com/nginx/agent/v3/internal/bus" + "github.com/nginx/agent/v3/internal/collector/types" "github.com/nginx/agent/v3/internal/config" "github.com/nginx/agent/v3/internal/model" "go.opentelemetry.io/collector/otelcol" @@ -30,7 +31,7 @@ const ( type ( // Collector The OTel collector plugin start an embedded OTel collector for metrics collection in the OTel format. Collector struct { - service *otelcol.Collector + service types.CollectorInterface cancel context.CancelFunc config *config.Config mu *sync.Mutex @@ -71,6 +72,13 @@ func New(conf *config.Config) (*Collector, error) { }, nil } +func (oc *Collector) GetState() otelcol.State { + oc.mu.Lock() + defer oc.mu.Unlock() + + return oc.service.GetState() +} + // Init initializes and starts the plugin func (oc *Collector) Init(ctx context.Context, mp bus.MessagePipeInterface) error { slog.InfoContext(ctx, "Starting OTel Collector plugin") @@ -106,13 +114,13 @@ func (oc *Collector) Init(ctx context.Context, mp bus.MessagePipeInterface) erro func (oc *Collector) processReceivers(ctx context.Context, receivers []config.OtlpReceiver) { for _, receiver := range receivers { if receiver.OtlpTLSConfig == nil { - slog.WarnContext(ctx, "OTEL receiver is configured without TLS. Connections are unencrypted.") + slog.WarnContext(ctx, "OTel receiver is configured without TLS. Connections are unencrypted.") continue } if receiver.OtlpTLSConfig.GenerateSelfSignedCert { slog.WarnContext(ctx, - "Self-signed certificate for OTEL receiver requested, "+ + "Self-signed certificate for OTel receiver requested, "+ "this is not recommended for production environments.", ) @@ -122,7 +130,7 @@ func (oc *Collector) processReceivers(ctx context.Context, receivers []config.Ot ) } } else { - slog.WarnContext(ctx, "OTEL receiver is configured without TLS. Connections are unencrypted.") + slog.WarnContext(ctx, "OTel receiver is configured without TLS. Connections are unencrypted.") } } } diff --git a/internal/collector/otel_collector_plugin_test.go b/internal/collector/otel_collector_plugin_test.go index 0c3a04b8a2..941b4c24c3 100644 --- a/internal/collector/otel_collector_plugin_test.go +++ b/internal/collector/otel_collector_plugin_test.go @@ -7,10 +7,10 @@ package collector import ( "bytes" "context" + "errors" "fmt" "strings" "testing" - "time" "github.com/nginx/agent/v3/test/protos" "github.com/nginx/agent/v3/test/stub" @@ -19,6 +19,7 @@ import ( "go.opentelemetry.io/collector/otelcol" "github.com/nginx/agent/v3/internal/bus" + "github.com/nginx/agent/v3/internal/collector/types/typesfakes" "github.com/nginx/agent/v3/internal/config" "github.com/nginx/agent/v3/internal/model" "github.com/nginx/agent/v3/test/helpers" @@ -26,32 +27,113 @@ import ( ) func TestCollector_New(t *testing.T) { - conf := types.OTelConfig(t) - conf.Collector.Log.Path = "" + tests := []struct { + config *config.Config + expectedError error + name string + }{ + { + name: "Nil agent config", + config: nil, + expectedError: errors.New("nil agent config"), + }, + { + name: "Nil collector config", + config: &config.Config{ + Collector: nil, + }, + expectedError: errors.New("nil collector config"), + }, + { + name: "File write error", + config: &config.Config{ + Collector: &config.Collector{ + Log: &config.Log{Path: "/invalid/path"}, + }, + }, + expectedError: errors.New("open /invalid/path: no such file or directory"), + }, + { + name: "Successful initialization", + config: &config.Config{ + Collector: &config.Collector{ + Log: &config.Log{Path: "/tmp/test.log"}, + }, + }, + expectedError: nil, + }, + } - _, err := New(conf) - require.NoError(t, err, "NewCollector should not return an error with valid config") + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + collector, err := New(tt.config) + + if tt.expectedError != nil { + require.Error(t, err) + assert.Equal(t, tt.expectedError.Error(), err.Error()) + } else { + require.NoError(t, err) + assert.NotNil(t, collector) + } + }) + } } func TestCollector_Init(t *testing.T) { - conf := types.OTelConfig(t) - conf.Collector = &config.Collector{} + tests := []struct { + name string + expectedLog string + expectedError bool + }{ + { + name: "Default configured", + expectedError: false, + expectedLog: "", + }, + { + name: "No receivers set in config", + expectedError: true, + expectedLog: "No receivers configured for OTel Collector", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + conf := types.OTelConfig(t) - logBuf := &bytes.Buffer{} - stub.StubLoggerWith(logBuf) + var collector *Collector + var err error + logBuf := &bytes.Buffer{} + stub.StubLoggerWith(logBuf) - collector, err := New(conf) - require.NoError(t, err, "NewCollector should not return an error with valid config") + conf.Collector.Log = &config.Log{Path: "/tmp/test.log"} - initError := collector.Init(context.Background(), nil) - require.NoError(t, initError) + if tt.expectedError { + conf.Collector.Receivers = config.Receivers{} + } + + collector, err = New(conf) + require.NoError(t, err, "NewCollector should not return an error with valid config") + + collector.service = createFakeCollector() - if s := logBuf.String(); !strings.Contains(s, "No receivers configured for OTel Collector. "+ - "Waiting to discover a receiver before starting OTel collector.") { - t.Errorf("Unexpected log %s", s) + initError := collector.Init(context.Background(), nil) + require.NoError(t, initError) + + validateLog(t, tt.expectedLog, logBuf) + + require.NoError(t, collector.Close(context.TODO())) + }) } +} + +func validateLog(t *testing.T, expectedLog string, logBuf *bytes.Buffer) { + t.Helper() - assert.True(t, collector.stopped) + if expectedLog != "" { + if !strings.Contains(logBuf.String(), expectedLog) { + t.Errorf("Expected log to contain %q, but got %q", expectedLog, logBuf.String()) + } + } } func TestCollector_InitAndClose(t *testing.T) { @@ -68,31 +150,17 @@ func TestCollector_InitAndClose(t *testing.T) { require.NoError(t, err) require.NoError(t, collector.Init(ctx, messagePipe), "Init should not return an error") - assert.Eventually( - t, - func() bool { return collector.service.GetState() == otelcol.StateRunning }, - 2*time.Second, - 100*time.Millisecond, - ) + collector.service = createFakeCollector() + + assert.Equal(t, otelcol.StateRunning, collector.GetState()) require.NoError(t, collector.Close(ctx), "Close should not return an error") - assert.Eventually( - t, - func() bool { return collector.service.GetState() == otelcol.StateClosed }, - 2*time.Second, - 100*time.Millisecond, - ) + assert.Equal(t, otelcol.StateClosed, collector.GetState()) } // nolint: revive func TestCollector_ProcessNginxConfigUpdateTopic(t *testing.T) { - nginxPlusMock := helpers.NewMockNGINXPlusAPIServer(t) - defer nginxPlusMock.Close() - - conf := types.OTelConfig(t) - conf.Collector.Log.Path = "" - tests := []struct { name string message *bus.Message @@ -104,26 +172,16 @@ func TestCollector_ProcessNginxConfigUpdateTopic(t *testing.T) { Topic: bus.NginxConfigUpdateTopic, Data: &model.NginxConfigContext{ InstanceID: "123", - PlusAPI: fmt.Sprintf("%s/api", nginxPlusMock.URL), + PlusAPI: "", }, }, receivers: config.Receivers{ - HostMetrics: &config.HostMetrics{ - CollectionInterval: time.Minute, - InitialDelay: time.Second, - Scrapers: &config.HostMetricsScrapers{ - CPU: &config.CPUScraper{}, - Disk: &config.DiskScraper{}, - Filesystem: &config.FilesystemScraper{}, - Memory: &config.MemoryScraper{}, - Network: &config.NetworkScraper{}, - }, - }, - OtlpReceivers: types.OtlpReceivers(), + HostMetrics: nil, + OtlpReceivers: nil, NginxPlusReceivers: []config.NginxPlusReceiver{ { InstanceID: "123", - PlusAPI: fmt.Sprintf("%s/api", nginxPlusMock.URL), + PlusAPI: "", }, }, }, @@ -134,7 +192,7 @@ func TestCollector_ProcessNginxConfigUpdateTopic(t *testing.T) { Topic: bus.NginxConfigUpdateTopic, Data: &model.NginxConfigContext{ InstanceID: "123", - StubStatus: "http://test.com:8080/stub_status", + StubStatus: "", AccessLogs: []*model.AccessLog{ { Name: "/var/log/nginx/access.log", @@ -144,22 +202,12 @@ func TestCollector_ProcessNginxConfigUpdateTopic(t *testing.T) { }, }, receivers: config.Receivers{ - HostMetrics: &config.HostMetrics{ - CollectionInterval: time.Minute, - InitialDelay: time.Second, - Scrapers: &config.HostMetricsScrapers{ - CPU: &config.CPUScraper{}, - Disk: &config.DiskScraper{}, - Filesystem: &config.FilesystemScraper{}, - Memory: &config.MemoryScraper{}, - Network: &config.NetworkScraper{}, - }, - }, - OtlpReceivers: types.OtlpReceivers(), + HostMetrics: nil, + OtlpReceivers: nil, NginxReceivers: []config.NginxReceiver{ { InstanceID: "123", - StubStatus: "http://test.com:8080/stub_status", + StubStatus: "", AccessLogs: []config.AccessLog{ { FilePath: "/var/log/nginx/access.log", @@ -168,59 +216,71 @@ func TestCollector_ProcessNginxConfigUpdateTopic(t *testing.T) { }, }, }, - NginxPlusReceivers: []config.NginxPlusReceiver{}, }, }, } for _, test := range tests { t.Run(test.name, func(tt *testing.T) { + nginxPlusMock := helpers.NewMockNGINXPlusAPIServer(t) + defer nginxPlusMock.Close() + + conf := types.OTelConfig(t) + + conf.Command = nil + + conf.Collector.Log.Path = "" + conf.Collector.Receivers.HostMetrics = nil + conf.Collector.Receivers.OtlpReceivers = nil + + if len(test.receivers.NginxPlusReceivers) == 1 { + url := fmt.Sprintf("%s/api", nginxPlusMock.URL) + test.receivers.NginxPlusReceivers[0].PlusAPI = url + + model, ok := test.message.Data.(*model.NginxConfigContext) + if !ok { + t.Logf("Can't cast type") + t.Fail() + } + + model.PlusAPI = url + } else { + url := fmt.Sprintf("%s/stub_status", nginxPlusMock.URL) + test.receivers.NginxReceivers[0].StubStatus = url + + model, ok := test.message.Data.(*model.NginxConfigContext) + if !ok { + t.Logf("Can't cast type") + t.Fail() + } + + model.StubStatus = url + } + + conf.Collector.Processors.Batch = nil + conf.Collector.Processors.Attribute = nil + conf.Collector.Processors.Resource = nil + conf.Collector.Extensions.Health = nil + conf.Collector.Extensions.HeadersSetter = nil + conf.Collector.Exporters.PrometheusExporter = nil + collector, err := New(conf) require.NoError(tt, err, "NewCollector should not return an error with valid config") + collector.service = createFakeCollector() + ctx := context.Background() messagePipe := bus.NewMessagePipe(10) err = messagePipe.Register(10, []bus.Plugin{collector}) require.NoError(tt, err) require.NoError(tt, collector.Init(ctx, messagePipe), "Init should not return an error") - defer collector.Close(ctx) - - assert.Eventually( - tt, - func() bool { return collector.service.GetState() == otelcol.StateRunning }, - 5*time.Second, - 100*time.Millisecond, - ) collector.Process(ctx, test.message) - assert.Eventually( - tt, - func() bool { return collector.service.GetState() == otelcol.StateRunning }, - 5*time.Second, - 100*time.Millisecond, - ) - - if len(test.receivers.NginxPlusReceivers) > 0 { - assert.Eventually( - tt, - func() bool { return len(collector.config.Collector.Receivers.NginxPlusReceivers) > 0 }, - 5*time.Second, - 100*time.Millisecond, - ) - } - - if len(test.receivers.NginxReceivers) > 0 { - assert.Eventually( - tt, - func() bool { return len(collector.config.Collector.Receivers.NginxReceivers) > 0 }, - 5*time.Second, - 100*time.Millisecond, - ) - } - assert.Equal(tt, test.receivers, collector.config.Collector.Receivers) + + defer collector.Close(ctx) }) } } @@ -290,38 +350,21 @@ func TestCollector_ProcessResourceUpdateTopic(t *testing.T) { collector, err := New(conf) require.NoError(tt, err, "NewCollector should not return an error with valid config") + collector.service = createFakeCollector() + ctx := context.Background() messagePipe := bus.NewMessagePipe(10) err = messagePipe.Register(10, []bus.Plugin{collector}) require.NoError(tt, err) require.NoError(tt, collector.Init(ctx, messagePipe), "Init should not return an error") - defer collector.Close(ctx) - - assert.Eventually( - tt, - func() bool { - tt.Logf("Collector state is %+v", collector.service.GetState()) - return collector.service.GetState() == otelcol.StateRunning - }, - 5*time.Second, - 100*time.Millisecond, - ) collector.Process(ctx, test.message) - assert.Eventually( - tt, - func() bool { - tt.Logf("Collector state is %+v", collector.service.GetState()) - return collector.service.GetState() == otelcol.StateRunning - }, - 5*time.Second, - 100*time.Millisecond, - ) - assert.Equal(tt, test.processors, collector.config.Collector.Processors) assert.Equal(tt, test.headers, collector.config.Collector.Extensions.HeadersSetter.Headers) + + defer collector.Close(ctx) }) } } @@ -367,6 +410,8 @@ func TestCollector_ProcessResourceUpdateTopicFails(t *testing.T) { collector, err := New(conf) require.NoError(tt, err, "NewCollector should not return an error with valid config") + collector.service = createFakeCollector() + ctx := context.Background() messagePipe := bus.NewMessagePipe(10) err = messagePipe.Register(10, []bus.Plugin{collector}) @@ -375,28 +420,8 @@ func TestCollector_ProcessResourceUpdateTopicFails(t *testing.T) { require.NoError(tt, collector.Init(ctx, messagePipe), "Init should not return an error") defer collector.Close(ctx) - assert.Eventually( - tt, - func() bool { - tt.Logf("Collector state is %+v", collector.service.GetState()) - return collector.service.GetState() == otelcol.StateRunning - }, - 5*time.Second, - 100*time.Millisecond, - ) - collector.Process(ctx, test.message) - assert.Eventually( - tt, - func() bool { - tt.Logf("Collector state is %+v", collector.service.GetState()) - return collector.service.GetState() == otelcol.StateRunning - }, - 5*time.Second, - 100*time.Millisecond, - ) - assert.Equal(tt, config.Processors{ Batch: nil, @@ -619,3 +644,15 @@ func TestCollector_updateResourceAttributes(t *testing.T) { }) } } + +func createFakeCollector() *typesfakes.FakeCollectorInterface { + fakeCollector := &typesfakes.FakeCollectorInterface{} + fakeCollector.RunStub = func(ctx context.Context) error { return nil } + fakeCollector.GetStateReturnsOnCall(0, otelcol.StateRunning) + fakeCollector.GetStateReturnsOnCall(1, otelcol.StateClosing) + fakeCollector.ShutdownCalls(func() { + fakeCollector.GetStateReturns(otelcol.StateClosed) + }) + + return fakeCollector +} diff --git a/internal/collector/otelcol.tmpl b/internal/collector/otelcol.tmpl index e6b3baca83..12bdc36ef4 100644 --- a/internal/collector/otelcol.tmpl +++ b/internal/collector/otelcol.tmpl @@ -109,7 +109,9 @@ exporters: {{- range $index, $otlpExporter := .Exporters.OtlpExporters }} otlp/{{$index}}: endpoint: "{{ .Server.Host -}}:{{- .Server.Port }}" - compression: none + {{- if .Compression }} + compression: {{ .Compression }} + {{- end }} timeout: 10s retry_on_failure: enabled: true diff --git a/internal/collector/settings_test.go b/internal/collector/settings_test.go index a512a5bc26..baf29807fe 100644 --- a/internal/collector/settings_test.go +++ b/internal/collector/settings_test.go @@ -115,6 +115,11 @@ func TestTemplateWrite(t *testing.T) { } cfg.Collector.Exporters.OtlpExporters[0].Authenticator = "headers_setter" + // nolint: lll + cfg.Collector.Exporters.OtlpExporters[0].Compression = types.AgentConfig().Collector.Exporters.OtlpExporters[0].Compression + cfg.Collector.Exporters.OtlpExporters[0].Server.Port = 1234 + cfg.Collector.Receivers.OtlpReceivers[0].Server.Port = 4317 + cfg.Collector.Extensions.Health.Server.Port = 1337 require.NotNil(t, cfg) diff --git a/internal/collector/types/collector.go b/internal/collector/types/collector.go new file mode 100644 index 0000000000..e92f54c296 --- /dev/null +++ b/internal/collector/types/collector.go @@ -0,0 +1,22 @@ +// Copyright (c) F5, Inc. +// +// This source code is licensed under the Apache License, Version 2.0 license found in the +// LICENSE file in the root directory of this source tree. + +package types + +import ( + "context" + + "go.opentelemetry.io/collector/otelcol" +) + +// CollectorInterface The high-level collector interface +type CollectorInterface interface { + Run(ctx context.Context) error + GetState() otelcol.State + Shutdown() +} + +// Ensure the original Collector struct implements your interface +var _ CollectorInterface = (*otelcol.Collector)(nil) diff --git a/internal/collector/types/mocks.go b/internal/collector/types/mocks.go new file mode 100644 index 0000000000..bdb44f6291 --- /dev/null +++ b/internal/collector/types/mocks.go @@ -0,0 +1,9 @@ +// Copyright (c) F5, Inc. +// +// This source code is licensed under the Apache License, Version 2.0 license found in the +// LICENSE file in the root directory of this source tree. + +package types + +//go:generate go run github.com/maxbrunsfeld/counterfeiter/v6@v6.8.1 -generate +//counterfeiter:generate . CollectorInterface diff --git a/internal/collector/types/typesfakes/fake_collector_interface.go b/internal/collector/types/typesfakes/fake_collector_interface.go new file mode 100644 index 0000000000..cffd05e136 --- /dev/null +++ b/internal/collector/types/typesfakes/fake_collector_interface.go @@ -0,0 +1,208 @@ +// Code generated by counterfeiter. DO NOT EDIT. +package typesfakes + +import ( + "context" + "sync" + + "github.com/nginx/agent/v3/internal/collector/types" + "go.opentelemetry.io/collector/otelcol" +) + +type FakeCollectorInterface struct { + GetStateStub func() otelcol.State + getStateMutex sync.RWMutex + getStateArgsForCall []struct { + } + getStateReturns struct { + result1 otelcol.State + } + getStateReturnsOnCall map[int]struct { + result1 otelcol.State + } + RunStub func(context.Context) error + runMutex sync.RWMutex + runArgsForCall []struct { + arg1 context.Context + } + runReturns struct { + result1 error + } + runReturnsOnCall map[int]struct { + result1 error + } + ShutdownStub func() + shutdownMutex sync.RWMutex + shutdownArgsForCall []struct { + } + invocations map[string][][]interface{} + invocationsMutex sync.RWMutex +} + +func (fake *FakeCollectorInterface) GetState() otelcol.State { + fake.getStateMutex.Lock() + ret, specificReturn := fake.getStateReturnsOnCall[len(fake.getStateArgsForCall)] + fake.getStateArgsForCall = append(fake.getStateArgsForCall, struct { + }{}) + stub := fake.GetStateStub + fakeReturns := fake.getStateReturns + fake.recordInvocation("GetState", []interface{}{}) + fake.getStateMutex.Unlock() + if stub != nil { + return stub() + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeCollectorInterface) GetStateCallCount() int { + fake.getStateMutex.RLock() + defer fake.getStateMutex.RUnlock() + return len(fake.getStateArgsForCall) +} + +func (fake *FakeCollectorInterface) GetStateCalls(stub func() otelcol.State) { + fake.getStateMutex.Lock() + defer fake.getStateMutex.Unlock() + fake.GetStateStub = stub +} + +func (fake *FakeCollectorInterface) GetStateReturns(result1 otelcol.State) { + fake.getStateMutex.Lock() + defer fake.getStateMutex.Unlock() + fake.GetStateStub = nil + fake.getStateReturns = struct { + result1 otelcol.State + }{result1} +} + +func (fake *FakeCollectorInterface) GetStateReturnsOnCall(i int, result1 otelcol.State) { + fake.getStateMutex.Lock() + defer fake.getStateMutex.Unlock() + fake.GetStateStub = nil + if fake.getStateReturnsOnCall == nil { + fake.getStateReturnsOnCall = make(map[int]struct { + result1 otelcol.State + }) + } + fake.getStateReturnsOnCall[i] = struct { + result1 otelcol.State + }{result1} +} + +func (fake *FakeCollectorInterface) Run(arg1 context.Context) error { + fake.runMutex.Lock() + ret, specificReturn := fake.runReturnsOnCall[len(fake.runArgsForCall)] + fake.runArgsForCall = append(fake.runArgsForCall, struct { + arg1 context.Context + }{arg1}) + stub := fake.RunStub + fakeReturns := fake.runReturns + fake.recordInvocation("Run", []interface{}{arg1}) + fake.runMutex.Unlock() + if stub != nil { + return stub(arg1) + } + if specificReturn { + return ret.result1 + } + return fakeReturns.result1 +} + +func (fake *FakeCollectorInterface) RunCallCount() int { + fake.runMutex.RLock() + defer fake.runMutex.RUnlock() + return len(fake.runArgsForCall) +} + +func (fake *FakeCollectorInterface) RunCalls(stub func(context.Context) error) { + fake.runMutex.Lock() + defer fake.runMutex.Unlock() + fake.RunStub = stub +} + +func (fake *FakeCollectorInterface) RunArgsForCall(i int) context.Context { + fake.runMutex.RLock() + defer fake.runMutex.RUnlock() + argsForCall := fake.runArgsForCall[i] + return argsForCall.arg1 +} + +func (fake *FakeCollectorInterface) RunReturns(result1 error) { + fake.runMutex.Lock() + defer fake.runMutex.Unlock() + fake.RunStub = nil + fake.runReturns = struct { + result1 error + }{result1} +} + +func (fake *FakeCollectorInterface) RunReturnsOnCall(i int, result1 error) { + fake.runMutex.Lock() + defer fake.runMutex.Unlock() + fake.RunStub = nil + if fake.runReturnsOnCall == nil { + fake.runReturnsOnCall = make(map[int]struct { + result1 error + }) + } + fake.runReturnsOnCall[i] = struct { + result1 error + }{result1} +} + +func (fake *FakeCollectorInterface) Shutdown() { + fake.shutdownMutex.Lock() + fake.shutdownArgsForCall = append(fake.shutdownArgsForCall, struct { + }{}) + stub := fake.ShutdownStub + fake.recordInvocation("Shutdown", []interface{}{}) + fake.shutdownMutex.Unlock() + if stub != nil { + fake.ShutdownStub() + } +} + +func (fake *FakeCollectorInterface) ShutdownCallCount() int { + fake.shutdownMutex.RLock() + defer fake.shutdownMutex.RUnlock() + return len(fake.shutdownArgsForCall) +} + +func (fake *FakeCollectorInterface) ShutdownCalls(stub func()) { + fake.shutdownMutex.Lock() + defer fake.shutdownMutex.Unlock() + fake.ShutdownStub = stub +} + +func (fake *FakeCollectorInterface) Invocations() map[string][][]interface{} { + fake.invocationsMutex.RLock() + defer fake.invocationsMutex.RUnlock() + fake.getStateMutex.RLock() + defer fake.getStateMutex.RUnlock() + fake.runMutex.RLock() + defer fake.runMutex.RUnlock() + fake.shutdownMutex.RLock() + defer fake.shutdownMutex.RUnlock() + copiedInvocations := map[string][][]interface{}{} + for key, value := range fake.invocations { + copiedInvocations[key] = value + } + return copiedInvocations +} + +func (fake *FakeCollectorInterface) recordInvocation(key string, args []interface{}) { + fake.invocationsMutex.Lock() + defer fake.invocationsMutex.Unlock() + if fake.invocations == nil { + fake.invocations = map[string][][]interface{}{} + } + if fake.invocations[key] == nil { + fake.invocations[key] = [][]interface{}{} + } + fake.invocations[key] = append(fake.invocations[key], args) +} + +var _ types.CollectorInterface = new(FakeCollectorInterface) diff --git a/internal/command/command_plugin_test.go b/internal/command/command_plugin_test.go index 6851195604..0481232f3f 100644 --- a/internal/command/command_plugin_test.go +++ b/internal/command/command_plugin_test.go @@ -12,6 +12,8 @@ import ( "testing" "time" + "github.com/nginx/agent/v3/internal/bus/busfakes" + mpi "github.com/nginx/agent/v3/api/grpc/mpi/v1" "github.com/nginx/agent/v3/internal/bus" "github.com/nginx/agent/v3/internal/command/commandfakes" @@ -48,7 +50,7 @@ func TestCommandPlugin_Subscriptions(t *testing.T) { func TestCommandPlugin_Init(t *testing.T) { ctx := context.Background() - messagePipe := bus.NewFakeMessagePipe() + messagePipe := busfakes.NewFakeMessagePipe() fakeCommandService := &commandfakes.FakeCommandService{} commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}) @@ -67,7 +69,7 @@ func TestCommandPlugin_Init(t *testing.T) { func TestCommandPlugin_Process(t *testing.T) { ctx := context.Background() - messagePipe := bus.NewFakeMessagePipe() + messagePipe := busfakes.NewFakeMessagePipe() fakeCommandService := &commandfakes.FakeCommandService{} commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}) @@ -147,7 +149,7 @@ func TestCommandPlugin_monitorSubscribeChannel(t *testing.T) { t.Run(test.name, func(tt *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - messagePipe := bus.NewFakeMessagePipe() + messagePipe := busfakes.NewFakeMessagePipe() commandPlugin := NewCommandPlugin(types.AgentConfig(), &grpcfakes.FakeGrpcConnectionInterface{}) err := commandPlugin.Init(ctx, messagePipe) diff --git a/internal/config/config.go b/internal/config/config.go index b6df4f0bb1..4bd76679ca 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -524,7 +524,7 @@ func resolveProcessors() Processors { return processors } -// generate self-signed certificate for OTEL receiver +// generate self-signed certificate for OTel receiver // nolint: revive func handleSelfSignedCertificates(col *Collector) error { if col.Receivers.OtlpReceivers != nil { diff --git a/internal/config/types.go b/internal/config/types.go index 0e36ae0819..3ad6809fba 100644 --- a/internal/config/types.go +++ b/internal/config/types.go @@ -91,6 +91,7 @@ type ( OtlpExporter struct { Server *ServerConfig `yaml:"-" mapstructure:"server"` TLS *TLSConfig `yaml:"-" mapstructure:"tls"` + Compression string `yaml:"-" mapstructure:"compression"` Authenticator string `yaml:"-" mapstructure:"authenticator"` } diff --git a/internal/file/file_plugin_test.go b/internal/file/file_plugin_test.go index ee1def1af1..2544fa4976 100644 --- a/internal/file/file_plugin_test.go +++ b/internal/file/file_plugin_test.go @@ -12,6 +12,8 @@ import ( "testing" "time" + "github.com/nginx/agent/v3/internal/bus/busfakes" + "github.com/google/uuid" "google.golang.org/protobuf/types/known/timestamppb" @@ -77,7 +79,7 @@ func TestFilePlugin_Process_NginxConfigUpdateTopic(t *testing.T) { fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} fakeGrpcConnection.FileServiceClientReturns(fakeFileServiceClient) - messagePipe := bus.NewFakeMessagePipe() + messagePipe := busfakes.NewFakeMessagePipe() filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection) err := filePlugin.Init(ctx, messagePipe) @@ -153,7 +155,7 @@ func TestFilePlugin_Process_ConfigApplyRequestTopic(t *testing.T) { t.Run(test.name, func(t *testing.T) { fakeFileManagerService := &filefakes.FakeFileManagerServiceInterface{} fakeFileManagerService.ConfigApplyReturns(test.configApplyStatus, test.configApplyReturnsErr) - messagePipe := bus.NewFakeMessagePipe() + messagePipe := busfakes.NewFakeMessagePipe() filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection) err := filePlugin.Init(ctx, messagePipe) filePlugin.fileManagerService = fakeFileManagerService @@ -250,7 +252,7 @@ func TestFilePlugin_Process_ConfigUploadRequestTopic(t *testing.T) { fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} fakeGrpcConnection.FileServiceClientReturns(fakeFileServiceClient) - messagePipe := bus.NewFakeMessagePipe() + messagePipe := busfakes.NewFakeMessagePipe() filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection) err := filePlugin.Init(ctx, messagePipe) @@ -305,7 +307,7 @@ func TestFilePlugin_Process_ConfigUploadRequestTopic_Failure(t *testing.T) { fakeFileServiceClient := &v1fakes.FakeFileServiceClient{} fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} fakeGrpcConnection.FileServiceClientReturns(fakeFileServiceClient) - messagePipe := bus.NewFakeMessagePipe() + messagePipe := busfakes.NewFakeMessagePipe() filePlugin := NewFilePlugin(types.AgentConfig(), fakeGrpcConnection) err := filePlugin.Init(ctx, messagePipe) @@ -382,7 +384,7 @@ func TestFilePlugin_Process_ConfigApplyFailedTopic(t *testing.T) { fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} fakeGrpcConnection.FileServiceClientReturns(fakeFileServiceClient) - messagePipe := bus.NewFakeMessagePipe() + messagePipe := busfakes.NewFakeMessagePipe() agentConfig := types.AgentConfig() filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection) @@ -428,7 +430,7 @@ func TestFilePlugin_Process_ConfigApplyRollbackCompleteTopic(t *testing.T) { instance := protos.GetNginxOssInstance([]string{}) mockFileManager := &filefakes.FakeFileManagerServiceInterface{} - messagePipe := bus.NewFakeMessagePipe() + messagePipe := busfakes.NewFakeMessagePipe() agentConfig := types.AgentConfig() fakeGrpcConnection := &grpcfakes.FakeGrpcConnectionInterface{} filePlugin := NewFilePlugin(agentConfig, fakeGrpcConnection) diff --git a/internal/resource/resource_plugin_test.go b/internal/resource/resource_plugin_test.go index 8f345f1ecb..d1bcc79d72 100644 --- a/internal/resource/resource_plugin_test.go +++ b/internal/resource/resource_plugin_test.go @@ -12,6 +12,8 @@ import ( "sort" "testing" + "github.com/nginx/agent/v3/internal/bus/busfakes" + "github.com/nginx/agent/v3/internal/model" "github.com/nginx/agent/v3/test/types" @@ -95,7 +97,7 @@ func TestResource_Process(t *testing.T) { fakeResourceService.AddInstancesReturns(protos.GetHostResource()) fakeResourceService.UpdateInstancesReturns(test.resource) fakeResourceService.DeleteInstancesReturns(test.resource) - messagePipe := bus.NewFakeMessagePipe() + messagePipe := busfakes.NewFakeMessagePipe() resourcePlugin := NewResource(types.AgentConfig()) resourcePlugin.resourceService = fakeResourceService @@ -154,7 +156,7 @@ func TestResource_Process_Apply(t *testing.T) { t.Run(test.name, func(tt *testing.T) { fakeResourceService := &resourcefakes.FakeResourceServiceInterface{} fakeResourceService.ApplyConfigReturns(test.applyErr) - messagePipe := bus.NewFakeMessagePipe() + messagePipe := busfakes.NewFakeMessagePipe() resourcePlugin := NewResource(types.AgentConfig()) resourcePlugin.resourceService = fakeResourceService @@ -222,7 +224,7 @@ func TestResource_Process_Rollback(t *testing.T) { t.Run(test.name, func(tt *testing.T) { fakeResourceService := &resourcefakes.FakeResourceServiceInterface{} fakeResourceService.ApplyConfigReturns(test.rollbackErr) - messagePipe := bus.NewFakeMessagePipe() + messagePipe := busfakes.NewFakeMessagePipe() resourcePlugin := NewResource(types.AgentConfig()) resourcePlugin.resourceService = fakeResourceService @@ -278,7 +280,7 @@ func TestResource_Init(t *testing.T) { ctx := context.Background() resourceService := resourcefakes.FakeResourceServiceInterface{} - messagePipe := bus.NewFakeMessagePipe() + messagePipe := busfakes.NewFakeMessagePipe() messagePipe.RunWithoutInit(ctx) resourcePlugin := NewResource(types.AgentConfig()) diff --git a/internal/watcher/watcher_plugin_test.go b/internal/watcher/watcher_plugin_test.go index 9cca5d80d4..c4b74d09a7 100644 --- a/internal/watcher/watcher_plugin_test.go +++ b/internal/watcher/watcher_plugin_test.go @@ -10,6 +10,8 @@ import ( "testing" "time" + "github.com/nginx/agent/v3/internal/bus/busfakes" + "github.com/google/uuid" "google.golang.org/protobuf/types/known/timestamppb" @@ -32,7 +34,7 @@ func TestWatcher_Init(t *testing.T) { watcherPlugin := NewWatcher(types.AgentConfig()) - messagePipe := bus.NewFakeMessagePipe() + messagePipe := busfakes.NewFakeMessagePipe() err := watcherPlugin.Init(ctx, messagePipe) defer func() { diff --git a/scripts/testing/otel-collector.yaml b/scripts/testing/otel-collector.yaml index 208f0e4d39..6196a1591b 100644 --- a/scripts/testing/otel-collector.yaml +++ b/scripts/testing/otel-collector.yaml @@ -6,6 +6,7 @@ receivers: endpoint: 0.0.0.0:4317 http: endpoint: 0.0.0.0:4318 + # prometheus: # config: # scrape_configs: diff --git a/test/config/collector/test-opentelemetry-collector-agent.yaml b/test/config/collector/test-opentelemetry-collector-agent.yaml index a4f5084a26..8080573361 100644 --- a/test/config/collector/test-opentelemetry-collector-agent.yaml +++ b/test/config/collector/test-opentelemetry-collector-agent.yaml @@ -53,7 +53,7 @@ exporters: sampling_thereafter: 200 extensions: health_check: - endpoint: "localhost:1337" + endpoint: "127.0.0.1:1337" headers_setter: headers: - action: "insert" diff --git a/test/docker/nginx-plus/deb/Dockerfile b/test/docker/nginx-plus/deb/Dockerfile index 0754e4bd02..392ed5fb70 100644 --- a/test/docker/nginx-plus/deb/Dockerfile +++ b/test/docker/nginx-plus/deb/Dockerfile @@ -11,6 +11,8 @@ WORKDIR /agent COPY ./build /agent/build COPY $ENTRY_POINT /agent/entrypoint.sh +ENV PLUS_VERSION=R32 + RUN --mount=type=secret,id=nginx-crt,dst=nginx-repo.crt \ --mount=type=secret,id=nginx-key,dst=nginx-repo.key \ set -x \ @@ -47,7 +49,7 @@ RUN --mount=type=secret,id=nginx-crt,dst=nginx-repo.crt \ && echo "Acquire::https::pkgs.nginx.com::Verify-Host \"true\";" >> /etc/apt/apt.conf.d/90nginx \ && echo "Acquire::https::pkgs.nginx.com::SslCert \"/etc/ssl/nginx/nginx-repo.crt\";" >> /etc/apt/apt.conf.d/90nginx \ && echo "Acquire::https::pkgs.nginx.com::SslKey \"/etc/ssl/nginx/nginx-repo.key\";" >> /etc/apt/apt.conf.d/90nginx \ - && printf "deb https://pkgs.nginx.com/plus/ubuntu `lsb_release -cs` nginx-plus\n" > /etc/apt/sources.list.d/nginx-plus.list \ + && printf "deb https://pkgs.nginx.com/plus/${PLUS_VERSION}/ubuntu/ `lsb_release -cs` nginx-plus\n" > /etc/apt/sources.list.d/nginx-plus.list \ && mkdir -p /etc/ssl/nginx \ && cat nginx-repo.crt > /etc/ssl/nginx/nginx-repo.crt \ && cat nginx-repo.key > /etc/ssl/nginx/nginx-repo.key \ diff --git a/test/helpers/network_utils.go b/test/helpers/network_utils.go new file mode 100644 index 0000000000..4a989b9bfa --- /dev/null +++ b/test/helpers/network_utils.go @@ -0,0 +1,46 @@ +// Copyright (c) F5, Inc. +// +// This source code is licensed under the Apache License, Version 2.0 license found in the +// LICENSE file in the root directory of this source tree. +package helpers + +import ( + "fmt" + "net" + "testing" + "time" + + "golang.org/x/exp/rand" +) + +// GetRandomPort generates a random port for testing and checks if a port is available by attempting to bind to it +func GetRandomPort(t *testing.T) (int, error) { + t.Helper() + rand.Seed(uint64(time.Now().UnixNano())) + + // Define the range for dynamic ports (49152–65535 as per IANA recommendation) + const minPort = 49152 + const maxPort = 65535 + + // try up to 10 times to get a random port + for i := 0; i < 10; i++ { + port := rand.Intn(maxPort-minPort+1) + minPort + + if isPortAvailable(port) { + return port, nil + } + } + + return 0, fmt.Errorf("could not find an available port after multiple attempts") +} + +// isPortAvailable checks if a port is available by attempting to bind to it +func isPortAvailable(port int) bool { + address := fmt.Sprintf("127.0.0.1:%d", port) + conn, err := net.Dial("tcp", address) + if conn != nil { + conn.Close() + } + + return err != nil +} diff --git a/test/helpers/nginx_plus.go b/test/helpers/nginx_plus.go index e095bc4fae..9103ecf198 100644 --- a/test/helpers/nginx_plus.go +++ b/test/helpers/nginx_plus.go @@ -6,7 +6,6 @@ package helpers import ( - "fmt" "net/http" "net/http/httptest" "testing" @@ -20,8 +19,6 @@ const endpointRootPath = "/api/9/" func NewMockNGINXPlusAPIServer(t *testing.T) *httptest.Server { t.Helper() return httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - fmt.Printf("got path %s\n", req.URL.Path) - var payload string switch req.URL.Path { case "/api/": diff --git a/test/integration/grpc_management_plane_api_test.go b/test/integration/grpc_management_plane_api_test.go index 6952ffa507..dd4a4b19b6 100644 --- a/test/integration/grpc_management_plane_api_test.go +++ b/test/integration/grpc_management_plane_api_test.go @@ -117,7 +117,6 @@ func createContainerNetwork(ctx context.Context, tb testing.TB) *testcontainers. tb.Cleanup(func() { networkErr := containerNetwork.Remove(ctx) tb.Logf("Error removing container network: %v", networkErr) - require.NoError(tb, networkErr) }) return containerNetwork diff --git a/test/load/otel_collector_plugin_load_test.go b/test/load/otel_collector_plugin_load_test.go index e1e5bb3b4f..c3797d2e91 100644 --- a/test/load/otel_collector_plugin_load_test.go +++ b/test/load/otel_collector_plugin_load_test.go @@ -33,6 +33,7 @@ func TestMetric10kDPS(t *testing.T) { name := fmt.Sprintf("OTLP-%s-%s", runtime.GOOS, binary) sender := testbed.NewOTLPMetricDataSender(testbed.DefaultHost, 4317) receiver := testbed.NewOTLPDataReceiver(5643) + receiver = receiver.WithCompression("none") t.Run(name, func(t *testing.T) { require.NoError(t, err) diff --git a/test/types/config.go b/test/types/config.go index 3d570c9152..681aa0cc02 100644 --- a/test/types/config.go +++ b/test/types/config.go @@ -11,11 +11,11 @@ import ( "time" "github.com/nginx/agent/v3/internal/config" + "github.com/nginx/agent/v3/test/helpers" + "github.com/stretchr/testify/require" ) const ( - commandPort = 8981 - clientPermitWithoutStream = true clientTime = 50 * time.Second clientTimeout = 5 * time.Second @@ -27,10 +27,6 @@ const ( commonMultiplier = 0.2 reloadMonitoringPeriod = 400 * time.Millisecond - - randomPort1 = 1234 - randomPort2 = 4317 - randomPort3 = 1337 ) // Produces a populated Agent Config for testing usage. @@ -53,8 +49,9 @@ func AgentConfig() *config.Config { { Server: &config.ServerConfig{ Host: "127.0.0.1", - Port: randomPort1, + Port: 0, }, + Compression: "none", }, }, }, @@ -66,7 +63,18 @@ func AgentConfig() *config.Config { }, }, Receivers: config.Receivers{ - OtlpReceivers: OtlpReceivers(), + OtlpReceivers: []config.OtlpReceiver{ + { + Server: &config.ServerConfig{ + Host: "127.0.0.1", + Port: 0, + Type: 0, + }, + Auth: &config.AuthConfig{ + Token: "even-secreter-token", + }, + }, + }, HostMetrics: &config.HostMetrics{ CollectionInterval: time.Minute, InitialDelay: time.Second, @@ -82,8 +90,8 @@ func AgentConfig() *config.Config { Extensions: config.Extensions{ Health: &config.Health{ Server: &config.ServerConfig{ - Host: "localhost", - Port: randomPort3, + Host: "127.0.0.1", + Port: 0, Type: 0, }, }, @@ -105,7 +113,7 @@ func AgentConfig() *config.Config { Command: &config.Command{ Server: &config.ServerConfig{ Host: "127.0.0.1", - Port: commandPort, + Port: 0, Type: config.Grpc, }, Auth: &config.AuthConfig{ @@ -156,20 +164,21 @@ func OTelConfig(t *testing.T) *config.Config { ac := AgentConfig() ac.Collector.ConfigPath = filepath.Join(t.TempDir(), "otel-collector-config.yaml") - return ac -} + exporterPort, expErr := helpers.GetRandomPort(t) + require.NoError(t, expErr) + ac.Collector.Exporters.OtlpExporters[0].Server.Port = exporterPort -func OtlpReceivers() []config.OtlpReceiver { - return []config.OtlpReceiver{ - { - Server: &config.ServerConfig{ - Host: "localhost", - Port: randomPort2, - Type: 0, - }, - Auth: &config.AuthConfig{ - Token: "even-secreter-token", - }, - }, - } + receiverPort, recErr := helpers.GetRandomPort(t) + require.NoError(t, recErr) + ac.Collector.Receivers.OtlpReceivers[0].Server.Port = receiverPort + + healthPort, healthErr := helpers.GetRandomPort(t) + require.NoError(t, healthErr) + ac.Collector.Extensions.Health.Server.Port = healthPort + + commandPort, commandErr := helpers.GetRandomPort(t) + require.NoError(t, commandErr) + ac.Command.Server.Port = commandPort + + return ac }