Skip to content

Commit

Permalink
OpenTelemetry: add compression option (#920)
Browse files Browse the repository at this point in the history
* add compression option to Agent configuration. Options are none, gzip, snappy and zstd.

---------

Co-authored-by: aphralG <[email protected]>
Co-authored-by: dhurley <[email protected]>
Co-authored-by: oliveromahony <[email protected]>
Co-authored-by: RRashmit <[email protected]>
  • Loading branch information
5 people authored Nov 25, 2024
1 parent f010cf3 commit ca1c468
Show file tree
Hide file tree
Showing 32 changed files with 599 additions and 226 deletions.
3 changes: 1 addition & 2 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ indent_size = 4
indent_style = space
trim_trailing_whitespace = false


[{*.yaml,*.yml}]
[{*.yaml,*.yml, otelcol.tmpl}]
indent_size = 2
indent_style = space
trim_trailing_whitespace = true
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ jobs:
benchmark-data-dir-path: ""
# Set auto-push to false since GitHub API token is not given
auto-push: false
alert-threshold: '125%'
alert-threshold: '175%'
gh-pages-branch: "benchmark-results"
fail-on-alert: true

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ format: ## Format code

unit-test: $(TEST_BUILD_DIR) ## Run unit tests
@CGO_ENABLED=0 $(GOTEST) -count=1 -coverprofile=$(TEST_BUILD_DIR)/tmp_coverage.out -coverpkg=./... -covermode count ./internal/... ./api/... ./cmd/... ./pkg/...
@cat $(TEST_BUILD_DIR)/tmp_coverage.out | grep -v ".pb.go" | grep -v ".gen.go" | grep -v ".pb.validate.go" | grep -v "fake_" | grep -v "github.com/nginx/agent/v3/test/" > $(TEST_BUILD_DIR)/coverage.out
@cat $(TEST_BUILD_DIR)/tmp_coverage.out | grep -v ".pb.go" | grep -v ".gen.go" | grep -v ".pb.validate.go" | grep -v "fake_" | grep -v "_utils.go" | grep -v "github.com/nginx/agent/v3/test/" > $(TEST_BUILD_DIR)/coverage.out
@rm $(TEST_BUILD_DIR)/tmp_coverage.out
@$(GOTOOL) cover -html=$(TEST_BUILD_DIR)/coverage.out -o $(TEST_BUILD_DIR)/coverage.html
@printf "\nTotal code coverage: " && $(GOTOOL) cover -func=$(TEST_BUILD_DIR)/coverage.out | grep 'total:' | awk '{print $$3}'
Expand Down
2 changes: 1 addition & 1 deletion api/grpc/mpi/v1/command.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/grpc/mpi/v1/common.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/grpc/mpi/v1/files.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ require (
go.opentelemetry.io/otel v1.28.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e
golang.org/x/mod v0.21.0
golang.org/x/sync v0.8.0
google.golang.org/protobuf v1.34.2
Expand Down Expand Up @@ -297,7 +298,6 @@ require (
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.7.0 // indirect
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e // indirect
golang.org/x/oauth2 v0.22.0 // indirect
golang.org/x/term v0.23.0 // indirect
golang.org/x/time v0.6.0 // indirect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,55 +3,70 @@
// This source code is licensed under the Apache License, Version 2.0 license found in the
// LICENSE file in the root directory of this source tree.

package bus
package busfakes

import (
"context"
"sync"

"github.com/nginx/agent/v3/internal/bus"
)

// FakeMessagePipe is a mock message pipe
type FakeMessagePipe struct {
plugins []Plugin
messages []*Message
processedMessages []*Message
plugins []bus.Plugin
messages []*bus.Message
processedMessages []*bus.Message
messagesLock sync.Mutex
}

var _ MessagePipeInterface = &FakeMessagePipe{}
var _ bus.MessagePipeInterface = &FakeMessagePipe{}

func NewFakeMessagePipe() *FakeMessagePipe {
return &FakeMessagePipe{
messagesLock: sync.Mutex{},
}
}

func (p *FakeMessagePipe) Register(size int, plugins []Plugin) error {
func (p *FakeMessagePipe) Register(size int, plugins []bus.Plugin) error {
p.plugins = append(p.plugins, plugins...)
return nil
}

func (p *FakeMessagePipe) DeRegister(ctx context.Context, pluginNames []string) error {
var plugins []Plugin
var plugins []bus.Plugin

plugins = p.findPlugins(pluginNames, plugins)

for _, plugin := range plugins {
index := getIndex(plugin.Info().Name, p.plugins)
index := p.GetIndex(plugin.Info().Name, p.plugins)
p.unsubscribePlugin(ctx, index, plugin)
}

return nil
}

func (p *FakeMessagePipe) unsubscribePlugin(ctx context.Context, index int, plugin Plugin) {
func (p *FakeMessagePipe) GetIndex(pluginName string, plugins []bus.Plugin) int {
for index, plugin := range plugins {
if pluginName == plugin.Info().Name {
return index
}
}

return -1
}

func (p *FakeMessagePipe) unsubscribePlugin(ctx context.Context, index int, plugin bus.Plugin) {
if index != -1 {
p.plugins = append(p.plugins[:index], p.plugins[index+1:]...)
plugin.Close(ctx)
err := plugin.Close(ctx)
if err != nil {
return
}
}
}

func (p *FakeMessagePipe) findPlugins(pluginNames []string, plugins []Plugin) []Plugin {
func (p *FakeMessagePipe) findPlugins(pluginNames []string, plugins []bus.Plugin) []bus.Plugin {
for _, name := range pluginNames {
for _, plugin := range p.plugins {
if plugin.Info().Name == name {
Expand All @@ -63,30 +78,30 @@ func (p *FakeMessagePipe) findPlugins(pluginNames []string, plugins []Plugin) []
return plugins
}

func (p *FakeMessagePipe) Process(ctx context.Context, msgs ...*Message) {
func (p *FakeMessagePipe) Process(_ context.Context, msgs ...*bus.Message) {
p.messagesLock.Lock()
defer p.messagesLock.Unlock()

p.messages = append(p.messages, msgs...)
}

func (p *FakeMessagePipe) GetMessages() []*Message {
func (p *FakeMessagePipe) GetMessages() []*bus.Message {
p.messagesLock.Lock()
defer p.messagesLock.Unlock()

return p.messages
}

func (p *FakeMessagePipe) GetProcessedMessages() []*Message {
func (p *FakeMessagePipe) GetProcessedMessages() []*bus.Message {
return p.processedMessages
}

func (p *FakeMessagePipe) ClearMessages() {
p.messagesLock.Lock()
defer p.messagesLock.Unlock()

p.processedMessages = []*Message{}
p.messages = []*Message{}
p.processedMessages = []*bus.Message{}
p.messages = []*bus.Message{}
}

func (p *FakeMessagePipe) Run(ctx context.Context) {
Expand All @@ -101,7 +116,7 @@ func (p *FakeMessagePipe) Run(ctx context.Context) {
}

func (p *FakeMessagePipe) RunWithoutInit(ctx context.Context) {
var message *Message
var message *bus.Message

for len(p.messages) > 0 {
message, p.messages = p.messages[0], p.messages[1:]
Expand All @@ -112,7 +127,7 @@ func (p *FakeMessagePipe) RunWithoutInit(ctx context.Context) {
}
}

func (p *FakeMessagePipe) GetPlugins() []Plugin {
func (p *FakeMessagePipe) GetPlugins() []bus.Plugin {
return p.plugins
}

Expand Down
17 changes: 10 additions & 7 deletions internal/bus/message_pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (p *MessagePipe) DeRegister(ctx context.Context, pluginNames []string) erro
plugins := p.findPlugins(pluginNames)

for _, plugin := range plugins {
index := getIndex(plugin.Info().Name, p.plugins)
index := p.GetIndex(plugin.Info().Name, p.plugins)

err := p.unsubscribePlugin(ctx, index, plugin)
if err != nil {
Expand Down Expand Up @@ -151,12 +151,15 @@ func (p *MessagePipe) unsubscribePlugin(ctx context.Context, index int, plugin P
if index != -1 {
p.plugins = append(p.plugins[:index], p.plugins[index+1:]...)

plugin.Close(ctx)
err := plugin.Close(ctx)
if err != nil {
return err
}

for _, subscription := range plugin.Subscriptions() {
err := p.bus.Unsubscribe(subscription, plugin.Process)
if err != nil {
return err
unsubErr := p.bus.Unsubscribe(subscription, plugin.Process)
if unsubErr != nil {
return unsubErr
}
}
}
Expand All @@ -165,7 +168,7 @@ func (p *MessagePipe) unsubscribePlugin(ctx context.Context, index int, plugin P
}

func (p *MessagePipe) findPlugins(pluginNames []string) []Plugin {
plugins := []Plugin{}
var plugins []Plugin

for _, name := range pluginNames {
for _, plugin := range p.plugins {
Expand All @@ -178,7 +181,7 @@ func (p *MessagePipe) findPlugins(pluginNames []string) []Plugin {
return plugins
}

func getIndex(pluginName string, plugins []Plugin) int {
func (p *MessagePipe) GetIndex(pluginName string, plugins []Plugin) int {
for index, plugin := range plugins {
if pluginName == plugin.Info().Name {
return index
Expand Down
2 changes: 1 addition & 1 deletion internal/collector/factories.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func OTelComponentFactories() (otelcol.Factories, error) {
}

func createConnectorFactories() (map[component.Type]connector.Factory, error) {
connectorsList := []connector.Factory{}
var connectorsList []connector.Factory

return connector.MakeFactoryMap(connectorsList...)
}
Expand Down
5 changes: 3 additions & 2 deletions internal/collector/factories_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ package collector
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/stretchr/testify/assert"
)

func TestOTelComponentFactories(t *testing.T) {
func TestOTelComponentFactoriesDefault(t *testing.T) {
factories, err := OTelComponentFactories()

require.NoError(t, err, "OTelComponentFactories should not return an error")
Expand Down
16 changes: 12 additions & 4 deletions internal/collector/otel_collector_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/nginx/agent/v3/api/grpc/mpi/v1"
"github.com/nginx/agent/v3/internal/backoff"
"github.com/nginx/agent/v3/internal/bus"
"github.com/nginx/agent/v3/internal/collector/types"
"github.com/nginx/agent/v3/internal/config"
"github.com/nginx/agent/v3/internal/model"
"go.opentelemetry.io/collector/otelcol"
Expand All @@ -30,7 +31,7 @@ const (
type (
// Collector The OTel collector plugin start an embedded OTel collector for metrics collection in the OTel format.
Collector struct {
service *otelcol.Collector
service types.CollectorInterface
cancel context.CancelFunc
config *config.Config
mu *sync.Mutex
Expand Down Expand Up @@ -71,6 +72,13 @@ func New(conf *config.Config) (*Collector, error) {
}, nil
}

func (oc *Collector) GetState() otelcol.State {
oc.mu.Lock()
defer oc.mu.Unlock()

return oc.service.GetState()
}

// Init initializes and starts the plugin
func (oc *Collector) Init(ctx context.Context, mp bus.MessagePipeInterface) error {
slog.InfoContext(ctx, "Starting OTel Collector plugin")
Expand Down Expand Up @@ -106,13 +114,13 @@ func (oc *Collector) Init(ctx context.Context, mp bus.MessagePipeInterface) erro
func (oc *Collector) processReceivers(ctx context.Context, receivers []config.OtlpReceiver) {
for _, receiver := range receivers {
if receiver.OtlpTLSConfig == nil {
slog.WarnContext(ctx, "OTEL receiver is configured without TLS. Connections are unencrypted.")
slog.WarnContext(ctx, "OTel receiver is configured without TLS. Connections are unencrypted.")
continue
}

if receiver.OtlpTLSConfig.GenerateSelfSignedCert {
slog.WarnContext(ctx,
"Self-signed certificate for OTEL receiver requested, "+
"Self-signed certificate for OTel receiver requested, "+
"this is not recommended for production environments.",
)

Expand All @@ -122,7 +130,7 @@ func (oc *Collector) processReceivers(ctx context.Context, receivers []config.Ot
)
}
} else {
slog.WarnContext(ctx, "OTEL receiver is configured without TLS. Connections are unencrypted.")
slog.WarnContext(ctx, "OTel receiver is configured without TLS. Connections are unencrypted.")
}
}
}
Expand Down
Loading

0 comments on commit ca1c468

Please sign in to comment.